package org.onlab.nio;

import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import org.onlab.nio.Message;
import org.onlab.nio.MessageStream;

/* loaded from: input_file:org/onlab/nio/IOLoop.class */
public abstract class IOLoop<M extends Message, S extends MessageStream<M>> extends SelectorLoop {
    private final Queue<IOLoop<M, S>.NewStreamRequest> newStreamRequests;
    private final Set<MessageStream<M>> streams;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onlab/nio/IOLoop$NewStreamRequest.class */
    public class NewStreamRequest {
        private final S stream;
        private final SelectableChannel channel;
        private final int op;

        public NewStreamRequest(S s, SelectableChannel selectableChannel, int i) {
            this.stream = s;
            this.channel = selectableChannel;
            this.op = i;
        }
    }

    public IOLoop(long j) throws IOException {
        super(j);
        this.newStreamRequests = new ConcurrentLinkedQueue();
        this.streams = new CopyOnWriteArraySet();
    }

    public int streamCount() {
        return this.streams.size();
    }

    protected abstract S createStream(ByteChannel byteChannel);

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeStream(MessageStream<M> messageStream) {
        this.streams.remove(messageStream);
    }

    protected abstract void processMessages(List<M> list, MessageStream<M> messageStream);

    protected void connect(SelectionKey selectionKey) throws IOException {
        ((SocketChannel) selectionKey.channel()).finishConnect();
        if (selectionKey.isValid()) {
            selectionKey.interestOps(1);
        }
    }

    protected void processKeyOperation(SelectionKey selectionKey) {
        MessageStream<M> messageStream = (MessageStream) selectionKey.attachment();
        try {
            if (!selectionKey.isValid()) {
                messageStream.close();
                return;
            }
            if (selectionKey.isConnectable()) {
                try {
                    connect(selectionKey);
                } catch (IOException | IllegalStateException e) {
                    this.log.warn("Unable to complete connection", e);
                }
            }
            if (selectionKey.isReadable()) {
                List<M> read = messageStream.read();
                if (read == null || messageStream.hadError()) {
                    messageStream.close();
                    return;
                } else if (!read.isEmpty()) {
                    try {
                        processMessages(read, messageStream);
                    } catch (RuntimeException e2) {
                        onError(messageStream, e2);
                    }
                }
            }
            if (selectionKey.isWritable()) {
                messageStream.flushIfPossible();
            }
            if (messageStream.hadError()) {
                messageStream.close();
            }
        } catch (IOException e3) {
            if (!messageStream.isClosed() && !isResetByPeer(e3)) {
                this.log.warn("Unable to process IO", e3);
            }
            messageStream.close();
        } catch (CancelledKeyException e4) {
            messageStream.close();
        }
    }

    private boolean isResetByPeer(IOException iOException) {
        Throwable cause = iOException.getCause();
        return cause != null && (cause instanceof IOException) && cause.getMessage().contains("reset by peer");
    }

    protected void onError(S s, RuntimeException runtimeException) {
        throw runtimeException;
    }

    public S acceptStream(SocketChannel socketChannel) {
        return createAndAdmit(socketChannel, 1);
    }

    public S connectStream(SocketChannel socketChannel) {
        return createAndAdmit(socketChannel, 8);
    }

    private synchronized S createAndAdmit(SocketChannel socketChannel, int i) {
        S createStream = createStream(socketChannel);
        this.streams.add(createStream);
        this.newStreamRequests.add(new NewStreamRequest(createStream, socketChannel, i));
        this.selector.wakeup();
        return createStream;
    }

    private void admitNewStreams() {
        Iterator<IOLoop<M, S>.NewStreamRequest> it = this.newStreamRequests.iterator();
        while (isRunning() && it.hasNext()) {
            try {
                IOLoop<M, S>.NewStreamRequest next = it.next();
                it.remove();
                ((NewStreamRequest) next).stream.setKey(((NewStreamRequest) next).channel.register(this.selector, ((NewStreamRequest) next).op, ((NewStreamRequest) next).stream));
            } catch (ClosedChannelException e) {
                this.log.warn("Unable to admit new message stream", e);
            }
        }
    }

    @Override // org.onlab.nio.SelectorLoop
    protected void loop() throws IOException {
        notifyReady();
        while (isRunning()) {
            admitNewStreams();
            Iterator<MessageStream<M>> it = this.streams.iterator();
            while (it.hasNext()) {
                it.next().flushIfWriteNotPending();
            }
            if (this.selector.select(this.selectTimeout) > 0 && isRunning()) {
                Iterator<SelectionKey> it2 = this.selector.selectedKeys().iterator();
                while (it2.hasNext()) {
                    SelectionKey next = it2.next();
                    it2.remove();
                    processKeyOperation(next);
                }
            }
        }
    }

    public synchronized int pruneStaleStreams() {
        for (MessageStream<M> messageStream : this.streams) {
            if (messageStream.isStale()) {
                messageStream.close();
            }
        }
        return this.streams.size();
    }
}
