package blazingcache.network.netty;

import blazingcache.network.Channel;
import blazingcache.network.Message;
import blazingcache.network.ReplyCallback;
import blazingcache.network.SendResultCallback;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:blazingcache/network/netty/NettyChannel.class */
public class NettyChannel extends Channel {
    volatile SocketChannel socket;
    private static final Logger LOGGER = Logger.getLogger(NettyChannel.class.getName());
    private final Map<String, ReplyCallback> pendingReplyMessages = new ConcurrentHashMap();
    private final Map<String, Message> pendingReplyMessagesSource = new ConcurrentHashMap();
    private final ExecutorService callbackexecutor;
    private final NettyConnector connector;

    public String toString() {
        return "NettyChannel{socket=" + this.socket + '}';
    }

    public NettyChannel(SocketChannel socketChannel, ExecutorService executorService, NettyConnector nettyConnector) {
        this.socket = socketChannel;
        this.callbackexecutor = executorService;
        this.connector = nettyConnector;
    }

    public void messageReceived(Message message) {
        if (message.getReplyMessageId() != null) {
            handleReply(message);
            return;
        }
        try {
            this.messagesReceiver.messageReceived(message);
        } catch (Throwable th) {
            LOGGER.log(Level.SEVERE, "error", th);
            close();
        }
    }

    private void handleReply(Message message) {
        ReplyCallback replyCallback = this.pendingReplyMessages.get(message.getReplyMessageId());
        if (replyCallback != null) {
            this.pendingReplyMessages.remove(message.getReplyMessageId());
            Message remove = this.pendingReplyMessagesSource.remove(message.getReplyMessageId());
            if (remove != null) {
                submitCallback(() -> {
                    replyCallback.replyReceived(remove, message, null);
                });
            }
        }
    }

    @Override // blazingcache.network.Channel
    public void sendOneWayMessage(final Message message, final SendResultCallback sendResultCallback) {
        if (message.getMessageId() == null) {
            message.setMessageId(UUID.randomUUID().toString());
        }
        if (this.socket == null) {
            sendResultCallback.messageSent(message, new Exception("connection is closed"));
        } else {
            this.socket.writeAndFlush(message).addListener(new GenericFutureListener() { // from class: blazingcache.network.netty.NettyChannel.1
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        sendResultCallback.messageSent(message, null);
                        return;
                    }
                    NettyChannel.LOGGER.log(Level.SEVERE, "error", future.cause());
                    sendResultCallback.messageSent(message, future.cause());
                    NettyChannel.this.close();
                }
            });
        }
    }

    @Override // blazingcache.network.Channel
    public void sendReplyMessage(Message message, Message message2) {
        if (message2.getMessageId() == null) {
            message2.setMessageId(UUID.randomUUID().toString());
        }
        if (this.socket == null) {
            LOGGER.log(Level.SEVERE, "channel not active, discarding reply message " + message2);
        } else {
            message2.setReplyMessageId(message.messageId);
            sendOneWayMessage(message2, new SendResultCallback() { // from class: blazingcache.network.netty.NettyChannel.2
                @Override // blazingcache.network.SendResultCallback
                public void messageSent(Message message3, Throwable th) {
                    if (th != null) {
                        NettyChannel.LOGGER.log(Level.SEVERE, "error", th);
                    }
                }
            });
        }
    }

    @Override // blazingcache.network.Channel
    public void sendMessageWithAsyncReply(final Message message, final ReplyCallback replyCallback) {
        if (message.getMessageId() == null) {
            message.setMessageId(UUID.randomUUID().toString());
        }
        if (this.socket == null) {
            submitCallback(() -> {
                replyCallback.replyReceived(message, null, new Exception("connection is not active"));
            });
            return;
        }
        this.pendingReplyMessages.put(message.getMessageId(), replyCallback);
        this.pendingReplyMessagesSource.put(message.getMessageId(), message);
        sendOneWayMessage(message, new SendResultCallback() { // from class: blazingcache.network.netty.NettyChannel.3
            @Override // blazingcache.network.SendResultCallback
            public void messageSent(Message message2, Throwable th) {
                if (th != null) {
                    NettyChannel.LOGGER.log(Level.SEVERE, this + ": error while sending reply message to " + message2, th);
                    NettyChannel nettyChannel = NettyChannel.this;
                    ReplyCallback replyCallback2 = replyCallback;
                    Message message3 = message;
                    nettyChannel.submitCallback(() -> {
                        replyCallback2.replyReceived(message3, null, new Exception(this + ": error while sending reply message to " + message2, th));
                    });
                }
            }
        });
    }

    @Override // blazingcache.network.Channel
    public boolean isValid() {
        return this.socket != null && this.socket.isOpen();
    }

    @Override // blazingcache.network.Channel, java.lang.AutoCloseable
    public void close() {
        String str = this.socket + "";
        if (this.socket != null) {
            try {
                this.socket.close().await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                this.socket = null;
            }
        }
        this.pendingReplyMessages.forEach((str2, replyCallback) -> {
            submitCallback(() -> {
                Message remove = this.pendingReplyMessagesSource.remove(str2);
                if (remove != null) {
                    replyCallback.replyReceived(remove, null, new IOException("comunication channel is closed. Cannot wait for pending messages, socket=" + str));
                }
            });
        });
        this.pendingReplyMessages.clear();
        if (this.connector != null) {
            this.connector.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void exceptionCaught(Throwable th) {
        LOGGER.log(Level.SEVERE, this + " io-error", th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void channelClosed() {
        submitCallback(() -> {
            if (this.messagesReceiver != null) {
                this.messagesReceiver.channelClosed();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitCallback(Runnable runnable) {
        try {
            this.callbackexecutor.submit(runnable);
        } catch (RejectedExecutionException e) {
            LOGGER.log(Level.SEVERE, this + " rejected runnable " + runnable, (Throwable) e);
        }
    }
}
