package org.onlab.nio.service;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.nio.AcceptorLoop;
import org.onlab.util.Tools;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/onlab/nio/service/IOLoopMessaging.class */
public class IOLoopMessaging implements MessagingService {
    private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
    static final long TIMEOUT = 1000;
    static final boolean SO_NO_DELAY = false;
    static final int SO_SEND_BUFFER_SIZE = 131072;
    static final int SO_RCV_BUFFER_SIZE = 131072;
    private static final int NUM_WORKERS = 8;
    private AcceptorLoop acceptorLoop;
    private Endpoint localEp;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final ExecutorService acceptorThreadPool = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/nio/messaging", "acceptor"));
    private final ExecutorService ioThreadPool = Executors.newFixedThreadPool(NUM_WORKERS, Tools.groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
    private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
    private int lastWorker = -1;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams = new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
    private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap();
    private final AtomicLong messageIdGenerator = new AtomicLong(0);
    private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder().removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() { // from class: org.onlab.nio.service.IOLoopMessaging.1
        public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> removalNotification) {
            if (removalNotification.wasEvicted()) {
                ((CompletableFuture) removalNotification.getValue()).completeExceptionally(new TimeoutException("Timedout waiting for reply"));
            }
        }
    }).build();

    /* loaded from: input_file:org/onlab/nio/service/IOLoopMessaging$DefaultAcceptorLoop.class */
    private class DefaultAcceptorLoop extends AcceptorLoop {
        public DefaultAcceptorLoop(SocketAddress socketAddress) throws IOException {
            super(500L, socketAddress);
        }

        @Override // org.onlab.nio.AcceptorLoop
        protected void acceptConnection(ServerSocketChannel serverSocketChannel) throws IOException {
            SocketChannel accept = serverSocketChannel.accept();
            accept.configureBlocking(false);
            Socket socket = accept.socket();
            socket.setTcpNoDelay(false);
            socket.setReceiveBufferSize(131072);
            socket.setSendBufferSize(131072);
            IOLoopMessaging.this.nextWorker().acceptStream(accept);
        }
    }

    /* loaded from: input_file:org/onlab/nio/service/IOLoopMessaging$DefaultMessageStreamFactory.class */
    private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
        private DefaultMessageStreamFactory() {
        }

        public void activateObject(Endpoint endpoint, DefaultMessageStream defaultMessageStream) throws Exception {
        }

        public void destroyObject(Endpoint endpoint, DefaultMessageStream defaultMessageStream) throws Exception {
            defaultMessageStream.close();
        }

        public DefaultMessageStream makeObject(Endpoint endpoint) throws Exception {
            DefaultMessageStream defaultMessageStream = IOLoopMessaging.this.createConnection(endpoint, IOLoopMessaging.this.nextWorker()).connectedFuture().get();
            IOLoopMessaging.this.log.info("Established a new connection to {}", endpoint);
            return defaultMessageStream;
        }

        public void passivateObject(Endpoint endpoint, DefaultMessageStream defaultMessageStream) throws Exception {
        }

        public boolean validateObject(Endpoint endpoint, DefaultMessageStream defaultMessageStream) {
            return defaultMessageStream.isClosed();
        }
    }

    public void start(Endpoint endpoint) throws IOException {
        if (this.started.get()) {
            this.log.warn("IOMessaging is already running at {}", endpoint);
            return;
        }
        this.localEp = endpoint;
        this.streams.setLifo(false);
        this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(endpoint.host().toString(), endpoint.port()));
        for (int i = SO_NO_DELAY; i < NUM_WORKERS; i++) {
            this.ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
        }
        List<DefaultIOLoop> list = this.ioLoops;
        ExecutorService executorService = this.ioThreadPool;
        executorService.getClass();
        list.forEach((v1) -> {
            r1.execute(v1);
        });
        this.acceptorThreadPool.execute(this.acceptorLoop);
        this.ioLoops.forEach(defaultIOLoop -> {
            defaultIOLoop.awaitStart(TIMEOUT);
        });
        this.acceptorLoop.awaitStart(TIMEOUT);
        this.started.set(true);
    }

    public void stop() {
        if (this.started.get()) {
            this.ioLoops.forEach((v0) -> {
                v0.shutdown();
            });
            this.acceptorLoop.shutdown();
            this.ioThreadPool.shutdown();
            this.acceptorThreadPool.shutdown();
            this.started.set(false);
        }
    }

    public CompletableFuture<Void> sendAsync(Endpoint endpoint, String str, byte[] bArr) {
        return sendAsync(endpoint, new DefaultMessage(this.messageIdGenerator.incrementAndGet(), this.localEp, str, bArr));
    }

    protected CompletableFuture<Void> sendAsync(Endpoint endpoint, DefaultMessage defaultMessage) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (endpoint.equals(this.localEp)) {
            dispatchLocally(defaultMessage);
            completableFuture.complete(null);
            return completableFuture;
        }
        DefaultMessageStream defaultMessageStream = SO_NO_DELAY;
        try {
            try {
                defaultMessageStream = (DefaultMessageStream) this.streams.borrowObject(endpoint);
                defaultMessageStream.write((DefaultMessageStream) defaultMessage);
                completableFuture.complete(null);
                try {
                    this.streams.returnObject(endpoint, defaultMessageStream);
                } catch (Exception e) {
                    this.log.warn("Failed to return stream to pool");
                }
            } catch (Exception e2) {
                completableFuture.completeExceptionally(e2);
                try {
                    this.streams.returnObject(endpoint, defaultMessageStream);
                } catch (Exception e3) {
                    this.log.warn("Failed to return stream to pool");
                }
            }
            return completableFuture;
        } catch (Throwable th) {
            try {
                this.streams.returnObject(endpoint, defaultMessageStream);
            } catch (Exception e4) {
                this.log.warn("Failed to return stream to pool");
            }
            throw th;
        }
    }

    public CompletableFuture<byte[]> sendAndReceive(Endpoint endpoint, String str, byte[] bArr) {
        CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
        Long valueOf = Long.valueOf(this.messageIdGenerator.incrementAndGet());
        this.responseFutures.put(valueOf, completableFuture);
        try {
            sendAsync(endpoint, new DefaultMessage(valueOf.longValue(), this.localEp, str, bArr));
        } catch (Exception e) {
            this.responseFutures.invalidate(valueOf);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    public void registerHandler(String str, Consumer<byte[]> consumer, Executor executor) {
        this.handlers.put(str, defaultMessage -> {
            executor.execute(() -> {
                consumer.accept(defaultMessage.payload());
            });
        });
    }

    public void registerHandler(String str, Function<byte[], byte[]> function, Executor executor) {
        this.handlers.put(str, defaultMessage -> {
            executor.execute(() -> {
                byte[] bArr = (byte[]) function.apply(defaultMessage.payload());
                if (bArr != null) {
                    sendAsync(defaultMessage.sender(), new DefaultMessage(defaultMessage.id(), this.localEp, REPLY_MESSAGE_TYPE, bArr)).whenComplete((r5, th) -> {
                        this.log.debug("Failed to respond", th);
                    });
                }
            });
        });
    }

    public void registerHandler(String str, Function<byte[], CompletableFuture<byte[]>> function) {
        this.handlers.put(str, defaultMessage -> {
            ((CompletableFuture) function.apply(defaultMessage.payload())).whenComplete((bArr, th) -> {
                if (th == null) {
                    sendAsync(defaultMessage.sender(), new DefaultMessage(defaultMessage.id(), this.localEp, REPLY_MESSAGE_TYPE, bArr)).whenComplete((r5, th) -> {
                        if (th != null) {
                            this.log.debug("Failed to respond", th);
                        }
                    });
                }
            });
        });
    }

    public void unregisterHandler(String str) {
        this.handlers.remove(str);
    }

    protected void dispatchLocally(DefaultMessage defaultMessage) {
        String type = defaultMessage.type();
        if (!REPLY_MESSAGE_TYPE.equals(type)) {
            Consumer<DefaultMessage> consumer = this.handlers.get(type);
            if (consumer != null) {
                consumer.accept(defaultMessage);
                return;
            } else {
                this.log.debug("No handler registered for {}", type);
                return;
            }
        }
        try {
            CompletableFuture completableFuture = (CompletableFuture) this.responseFutures.getIfPresent(Long.valueOf(defaultMessage.id()));
            if (completableFuture != null) {
                completableFuture.complete(defaultMessage.payload());
            } else {
                this.log.warn("Received a reply for message id:[{}].  from {}. But was unable to locate the request handle", Long.valueOf(defaultMessage.id()), defaultMessage.sender());
            }
        } finally {
            this.responseFutures.invalidate(Long.valueOf(defaultMessage.id()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized DefaultIOLoop nextWorker() {
        this.lastWorker = (this.lastWorker + 1) % NUM_WORKERS;
        return this.ioLoops.get(this.lastWorker);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DefaultMessageStream createConnection(Endpoint endpoint, DefaultIOLoop defaultIOLoop) throws IOException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(endpoint.host().toString(), endpoint.port());
        SocketChannel open = SocketChannel.open();
        open.configureBlocking(false);
        DefaultMessageStream connectStream = defaultIOLoop.connectStream(open);
        open.connect(inetSocketAddress);
        return connectStream;
    }
}
