/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.flume.sink;

import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.AbstractServer;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.util.Slice;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server
extends AbstractServer {
    private final String id;
    private final Discovery<byte[]> discovery;
    private final long acceptedTolerance;
    private final Discovery.Service<byte[]> service = new Discovery.Service<byte[]>(){

        @Override
        public String getHost() {
            return ((InetSocketAddress)Server.this.getServerAddress()).getHostName();
        }

        @Override
        public int getPort() {
            return ((InetSocketAddress)Server.this.getServerAddress()).getPort();
        }

        @Override
        public byte[] getPayload() {
            return null;
        }

        @Override
        public String getId() {
            return Server.this.id;
        }

        public String toString() {
            return "Server.Service{id=" + Server.this.id + ", host=" + this.getHost() + ", port=" + this.getPort() + ", payload=" + Arrays.toString(this.getPayload()) + '}';
        }
    };
    public final ArrayList<Request> requests = new ArrayList(4);
    private static final Logger logger = LoggerFactory.getLogger(Server.class);

    public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance) {
        this.id = id;
        this.discovery = discovery;
        this.acceptedTolerance = acceptedTolerance;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleException(Exception cce, EventLoop el) {
        logger.error("Server Error", (Throwable)cce);
        Request r = new Request(Command.SERVER_ERROR, null){

            @Override
            public Slice getAddress() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public int getEventCount() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public int getIdleCount() {
                throw new UnsupportedOperationException("Not supported yet.");
            }
        };
        ArrayList<Request> arrayList = this.requests;
        synchronized (arrayList) {
            this.requests.add(r);
        }
    }

    public void unregistered(SelectionKey key) {
        this.discovery.unadvertise(this.service);
        super.unregistered(key);
    }

    public void registered(SelectionKey key) {
        super.registered(key);
        this.discovery.advertise(this.service);
    }

    public Listener.ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc) {
        Client lClient = new Client();
        lClient.connected();
        return lClient;
    }

    public static int readInt(byte[] buffer, int offset) {
        return buffer[offset++] & 0xFF | (buffer[offset++] & 0xFF) << 8 | (buffer[offset++] & 0xFF) << 16 | (buffer[offset++] & 0xFF) << 24;
    }

    public static void writeInt(byte[] buffer, int offset, int i) {
        buffer[offset++] = (byte)i;
        buffer[offset++] = (byte)(i >>> 8);
        buffer[offset++] = (byte)(i >>> 16);
        buffer[offset++] = (byte)(i >>> 24);
    }

    public static long readLong(byte[] buffer, int offset) {
        return (long)buffer[offset++] & 0xFFL | (long)(buffer[offset++] & 0xFF) << 8 | (long)(buffer[offset++] & 0xFF) << 16 | (long)(buffer[offset++] & 0xFF) << 24 | (long)(buffer[offset++] & 0xFF) << 32 | (long)(buffer[offset++] & 0xFF) << 40 | (long)(buffer[offset++] & 0xFF) << 48 | (long)(buffer[offset++] & 0xFF) << 56;
    }

    public static void writeLong(byte[] buffer, int offset, long l) {
        buffer[offset++] = (byte)l;
        buffer[offset++] = (byte)(l >>> 8);
        buffer[offset++] = (byte)(l >>> 16);
        buffer[offset++] = (byte)(l >>> 24);
        buffer[offset++] = (byte)(l >>> 32);
        buffer[offset++] = (byte)(l >>> 40);
        buffer[offset++] = (byte)(l >>> 48);
        buffer[offset++] = (byte)(l >>> 56);
    }

    public static abstract class Request {
        public static final int FIXED_SIZE = 17;
        public static final int TIME_OFFSET = 9;
        public final Command type;
        public final Client client;

        public Request(Command type, Client client) {
            this.type = type;
            this.client = client;
        }

        public abstract Slice getAddress();

        public abstract int getEventCount();

        public abstract int getIdleCount();

        public String toString() {
            return "Request{type=" + (Object)((Object)this.type) + '}';
        }

        public static Request getRequest(final byte[] buffer, final int offset, Client client) {
            Command command = Command.getCommand(buffer[offset]);
            switch (command) {
                case WINDOWED: {
                    return new Request(Command.WINDOWED, client){
                        final int eventCount;
                        final int idleCount;
                        {
                            super(x0, x1);
                            this.eventCount = Server.readInt(buffer, offset + 1);
                            this.idleCount = Server.readInt(buffer, offset + 5);
                        }

                        @Override
                        public Slice getAddress() {
                            throw new UnsupportedOperationException();
                        }

                        @Override
                        public int getEventCount() {
                            return this.eventCount;
                        }

                        @Override
                        public int getIdleCount() {
                            return this.idleCount;
                        }

                        @Override
                        public String toString() {
                            return "Request{type=" + (Object)((Object)this.type) + ", eventCount=" + this.eventCount + ", idleCount=" + this.idleCount + '}';
                        }
                    };
                }
            }
            return new Request(command, client){
                final Slice address;
                {
                    super(x0, x1);
                    this.address = new Slice(buffer, offset + 1, 8);
                }

                @Override
                public Slice getAddress() {
                    return this.address;
                }

                @Override
                public int getEventCount() {
                    throw new UnsupportedOperationException();
                }

                @Override
                public int getIdleCount() {
                    throw new UnsupportedOperationException();
                }

                @Override
                public String toString() {
                    return "Request{type=" + (Object)((Object)this.type) + ", address=" + this.address + '}';
                }
            };
        }
    }

    public class Client
    extends AbstractLengthPrependerClient {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(byte[] buffer, int offset, int size) {
            if (size != 17) {
                logger.warn("Invalid Request Received: {} from {}", (Object)Arrays.copyOfRange(buffer, offset, offset + size), (Object)this.key.channel());
                return;
            }
            long requestTime = Server.readLong(buffer, offset + 9);
            if (System.currentTimeMillis() > requestTime + Server.this.acceptedTolerance) {
                logger.warn("Expired Request Received: {} from {}", (Object)Arrays.copyOfRange(buffer, offset, offset + size), (Object)this.key.channel());
                return;
            }
            try {
                if (Command.getCommand(buffer[offset]) == Command.ECHO) {
                    this.write(buffer, offset, size);
                    return;
                }
            }
            catch (IllegalArgumentException ex) {
                logger.warn("Invalid Request Received: {} from {}!", new Object[]{Arrays.copyOfRange(buffer, offset, offset + size), this.key.channel(), ex});
                return;
            }
            Request r = Request.getRequest(buffer, offset, this);
            ArrayList<Request> arrayList = Server.this.requests;
            synchronized (arrayList) {
                Server.this.requests.add(r);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void disconnected() {
            ArrayList<Request> arrayList = Server.this.requests;
            synchronized (arrayList) {
                Server.this.requests.add(Request.getRequest(new byte[]{Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this));
            }
            super.disconnected();
        }

        public boolean write(byte[] address, Slice event) {
            if (event.offset == 0 && event.length == event.buffer.length) {
                return this.write(address, event.buffer);
            }
            return this.write(address, event.toByteArray());
        }
    }

    public static final class Command
    extends Enum<Command> {
        public static final /* enum */ Command ECHO = new Command(0);
        public static final /* enum */ Command SEEK = new Command(1);
        public static final /* enum */ Command COMMITTED = new Command(2);
        public static final /* enum */ Command CHECKPOINTED = new Command(3);
        public static final /* enum */ Command CONNECTED = new Command(4);
        public static final /* enum */ Command DISCONNECTED = new Command(5);
        public static final /* enum */ Command WINDOWED = new Command(6);
        public static final /* enum */ Command SERVER_ERROR = new Command(7);
        private final byte ord;
        private static final /* synthetic */ Command[] $VALUES;

        public static Command[] values() {
            return (Command[])$VALUES.clone();
        }

        public static Command valueOf(String name) {
            return Enum.valueOf(Command.class, name);
        }

        private Command(byte b) {
            this.ord = b;
        }

        public byte getOrdinal() {
            return this.ord;
        }

        public static Command getCommand(byte b) {
            Command c;
            switch (b) {
                case 0: {
                    c = ECHO;
                    break;
                }
                case 1: {
                    c = SEEK;
                    break;
                }
                case 2: {
                    c = COMMITTED;
                    break;
                }
                case 3: {
                    c = CHECKPOINTED;
                    break;
                }
                case 4: {
                    c = CONNECTED;
                    break;
                }
                case 5: {
                    c = DISCONNECTED;
                    break;
                }
                case 6: {
                    c = WINDOWED;
                    break;
                }
                case 7: {
                    c = SERVER_ERROR;
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b));
                }
            }
            assert (b == c.ord);
            return c;
        }

        static {
            $VALUES = new Command[]{ECHO, SEEK, COMMITTED, CHECKPOINTED, CONNECTED, DISCONNECTED, WINDOWED, SERVER_ERROR};
        }
    }
}

