/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.flume.operator;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.util.Slice;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery;
import org.apache.apex.malhar.flume.sink.Server;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlumeInputOperator<T>
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
Operator.IdleTimeHandler,
Operator.CheckpointListener,
Partitioner<AbstractFlumeInputOperator<T>> {
    public final transient DefaultOutputPort<T> output = new DefaultOutputPort();
    public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort();
    @NotNull
    private String[] connectionSpecs;
    @NotNull
    private StreamCodec<Event> codec;
    private final ArrayList<RecoveryAddress> recoveryAddresses;
    private transient ArrayBlockingQueue<Slice> handoverBuffer = new ArrayBlockingQueue(5120);
    private transient int idleCounter;
    private transient int eventCounter;
    private transient DefaultEventLoop eventloop;
    private volatile transient boolean connected;
    private transient Context.OperatorContext context;
    private transient Client client;
    private transient long windowId;
    private transient byte[] address;
    @Min(value=0L)
    private long maxEventsPerSecond;
    private transient long maxEventsPerWindow;
    private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus = new ThreadLocal<HashMap<Integer, ConnectionStatus>>(){

        @Override
        protected HashMap<Integer, ConnectionStatus> initialValue() {
            return new HashMap<Integer, ConnectionStatus>();
        }
    };
    private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses = new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>(){

        @Override
        protected HashMap<String, ArrayList<RecoveryAddress>> initialValue() {
            return new HashMap<String, ArrayList<RecoveryAddress>>();
        }
    };
    protected static final transient ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks = new ThreadLocal();
    private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class);

    public AbstractFlumeInputOperator() {
        this.connectionSpecs = new String[0];
        this.recoveryAddresses = new ArrayList();
        this.maxEventsPerSecond = Long.MAX_VALUE;
    }

    public void setup(Context.OperatorContext context) {
        long windowDurationMillis = (Integer)context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * (Integer)context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
        this.maxEventsPerWindow = (long)((double)windowDurationMillis / 1000.0 * (double)this.maxEventsPerSecond);
        logger.debug("max-events per-second {} per-window {}", (Object)this.maxEventsPerSecond, (Object)this.maxEventsPerWindow);
        try {
            this.eventloop = new DefaultEventLoop("EventLoop-" + context.getId());
            this.eventloop.start();
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void activate(Context.OperatorContext ctx) {
        if (this.connectionSpecs.length == 0) {
            logger.info("Discovered zero FlumeSink");
        } else if (this.connectionSpecs.length == 1) {
            for (String connectAddresse : this.connectionSpecs) {
                logger.debug("Connection spec is {}", (Object)connectAddresse);
                String[] parts = connectAddresse.split(":");
                this.client = new Client(parts[0]);
                this.eventloop.connect(new InetSocketAddress(parts[1], Integer.parseInt(parts[2])), (Listener.ClientListener)this.client);
            }
        } else {
            throw new IllegalArgumentException(String.format("A physical %s operator cannot connect to more than 1 addresses!", this.getClass().getSimpleName()));
        }
        this.context = ctx;
    }

    public void beginWindow(long windowId) {
        this.windowId = windowId;
        this.idleCounter = 0;
        this.eventCounter = 0;
    }

    public void emitTuples() {
        int i = this.handoverBuffer.size();
        if (i > 0 && (long)this.eventCounter < this.maxEventsPerWindow) {
            T convert;
            Slice slice;
            while (--i > 0 && (long)this.eventCounter < this.maxEventsPerWindow - 1L) {
                slice = this.handoverBuffer.poll();
                slice.offset += 8;
                slice.length -= 8;
                convert = this.convert((Event)this.codec.fromByteArray(slice));
                if (convert == null) {
                    this.drop.emit((Object)slice);
                } else {
                    this.output.emit(convert);
                }
                ++this.eventCounter;
            }
            slice = this.handoverBuffer.poll();
            slice.offset += 8;
            slice.length -= 8;
            convert = this.convert((Event)this.codec.fromByteArray(slice));
            if (convert == null) {
                this.drop.emit((Object)slice);
            } else {
                this.output.emit(convert);
            }
            ++this.eventCounter;
            this.address = Arrays.copyOfRange(slice.buffer, slice.offset - 8, slice.offset);
        }
    }

    public void endWindow() {
        if (this.connected) {
            byte[] array = new byte[17];
            array[0] = Server.Command.WINDOWED.getOrdinal();
            Server.writeInt(array, 1, this.eventCounter);
            Server.writeInt(array, 5, this.idleCounter);
            Server.writeLong(array, 9, System.currentTimeMillis());
            logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", new Object[]{Server.Command.WINDOWED, this.eventCounter, this.idleCounter});
            this.client.write(array);
        }
        if (this.address != null) {
            RecoveryAddress rAddress = new RecoveryAddress();
            rAddress.address = this.address;
            this.address = null;
            rAddress.windowId = this.windowId;
            this.recoveryAddresses.add(rAddress);
        }
    }

    public void deactivate() {
        if (this.connected) {
            this.eventloop.disconnect((Listener.ClientListener)this.client);
        }
        this.context = null;
    }

    public void teardown() {
        this.eventloop.stop();
        this.eventloop = null;
    }

    public void handleIdleTime() {
        ++this.idleCounter;
        try {
            Thread.sleep(((Integer)this.context.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue());
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    public abstract T convert(Event var1);

    public String[] getConnectAddresses() {
        return (String[])this.connectionSpecs.clone();
    }

    public void setConnectAddresses(String[] specs) {
        this.connectionSpecs = (String[])specs.clone();
    }

    public StreamCodec<Event> getCodec() {
        return this.codec;
    }

    public void setCodec(StreamCodec<Event> codec) {
        this.codec = codec;
    }

    public void checkpointed(long windowId) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void committed(long windowId) {
        if (!this.connected) {
            return;
        }
        ArrayList<RecoveryAddress> arrayList = this.recoveryAddresses;
        synchronized (arrayList) {
            RecoveryAddress ra;
            byte[] addr = null;
            Iterator<RecoveryAddress> iterator = this.recoveryAddresses.iterator();
            while (iterator.hasNext()) {
                ra = iterator.next();
                if (ra.windowId > windowId) break;
                iterator.remove();
                if (ra.address == null) continue;
                addr = ra.address;
            }
            if (addr != null) {
                if (this.recoveryAddresses.isEmpty()) {
                    ra = new RecoveryAddress();
                    ra.address = addr;
                    this.recoveryAddresses.add(ra);
                }
                int arraySize = 17;
                byte[] array = new byte[arraySize];
                array[0] = Server.Command.COMMITTED.getOrdinal();
                System.arraycopy(addr, 0, array, 1, 8);
                Server.writeLong(array, 9, System.currentTimeMillis());
                logger.debug("wrote {} with recoveryOffset = {}", (Object)Server.Command.COMMITTED, (Object)Arrays.toString(addr));
                this.client.write(array);
            }
        }
    }

    public Collection<Partitioner.Partition<AbstractFlumeInputOperator<T>>> definePartitions(Collection<Partitioner.Partition<AbstractFlumeInputOperator<T>>> partitions, Partitioner.PartitioningContext context) {
        Collection<Discovery.Service<byte[]>> discovered = discoveredFlumeSinks.get();
        if (discovered == null) {
            return partitions;
        }
        HashMap<String, ArrayList<RecoveryAddress>> allRecoveryAddresses = abandonedRecoveryAddresses.get();
        ArrayList<String> allConnectAddresses = new ArrayList<String>(partitions.size());
        for (Partitioner.Partition<AbstractFlumeInputOperator<T>> partition : partitions) {
            String[] lAddresses = ((AbstractFlumeInputOperator)partition.getPartitionedInstance()).connectionSpecs;
            allConnectAddresses.addAll(Arrays.asList(lAddresses));
            int i = lAddresses.length;
            while (i-- > 0) {
                String[] parts = lAddresses[i].split(":", 2);
                allRecoveryAddresses.put(parts[0], ((AbstractFlumeInputOperator)partition.getPartitionedInstance()).recoveryAddresses);
            }
        }
        HashMap<String, String> connections = new HashMap<String, String>(discovered.size());
        for (Discovery.Service<byte[]> service : discovered) {
            String previousSpec = (String)connections.get(service.getId());
            String newspec = service.getId() + ':' + service.getHost() + ':' + service.getPort();
            if (previousSpec == null) {
                connections.put(service.getId(), newspec);
                continue;
            }
            boolean found = false;
            for (ConnectionStatus cs : partitionedInstanceStatus.get().values()) {
                if (!previousSpec.equals(cs.spec) || cs.connected) continue;
                connections.put(service.getId(), newspec);
                found = true;
                break;
            }
            if (found) continue;
            logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", (Object)previousSpec, (Object)newspec);
            connections.put(service.getId(), newspec);
        }
        int i = allConnectAddresses.size();
        while (i-- > 0) {
            String[] parts = ((String)allConnectAddresses.get(i)).split(":");
            String connection = (String)connections.remove(parts[0]);
            if (connection == null) {
                allConnectAddresses.remove(i);
                continue;
            }
            allConnectAddresses.set(i, connection);
        }
        allConnectAddresses.addAll(connections.values());
        partitions.clear();
        try {
            if (allConnectAddresses.isEmpty()) {
                AbstractFlumeInputOperator operator = (AbstractFlumeInputOperator)this.getClass().newInstance();
                operator.setCodec(this.codec);
                operator.setMaxEventsPerSecond(this.maxEventsPerSecond);
                for (ArrayList<RecoveryAddress> lRecoveryAddresses : allRecoveryAddresses.values()) {
                    operator.recoveryAddresses.addAll(lRecoveryAddresses);
                }
                operator.connectionSpecs = new String[allConnectAddresses.size()];
                int i2 = this.connectionSpecs.length;
                while (i2-- > 0) {
                    this.connectionSpecs[i2] = (String)allConnectAddresses.get(i2);
                }
                partitions.add((Partitioner.Partition<AbstractFlumeInputOperator<T>>)new DefaultPartition((Object)operator));
            } else {
                long maxEventsPerSecondPerOperator = this.maxEventsPerSecond / (long)allConnectAddresses.size();
                int i3 = allConnectAddresses.size();
                while (i3-- > 0) {
                    AbstractFlumeInputOperator operator = (AbstractFlumeInputOperator)this.getClass().newInstance();
                    operator.setCodec(this.codec);
                    operator.setMaxEventsPerSecond(maxEventsPerSecondPerOperator);
                    String connectAddress = (String)allConnectAddresses.get(i3);
                    operator.connectionSpecs = new String[]{connectAddress};
                    String[] parts = connectAddress.split(":", 2);
                    ArrayList<RecoveryAddress> remove = allRecoveryAddresses.remove(parts[0]);
                    if (remove != null) {
                        operator.recoveryAddresses.addAll(remove);
                    }
                    partitions.add((Partitioner.Partition<AbstractFlumeInputOperator<T>>)new DefaultPartition((Object)operator));
                }
            }
        }
        catch (IllegalAccessException ex) {
            throw new RuntimeException(ex);
        }
        catch (InstantiationException ex) {
            throw new RuntimeException(ex);
        }
        logger.debug("Requesting partitions: {}", partitions);
        return partitions;
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractFlumeInputOperator<T>>> partitions) {
        logger.debug("Partitioned Map: {}", partitions);
        HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
        map.clear();
        for (Map.Entry<Integer, Partitioner.Partition<AbstractFlumeInputOperator<T>>> entry : partitions.entrySet()) {
            if (map.containsKey(entry.getKey())) continue;
            map.put(entry.getKey(), null);
        }
    }

    public String toString() {
        return "AbstractFlumeInputOperator{connected=" + this.connected + ", connectionSpecs=" + (this.connectionSpecs.length == 0 ? "empty" : this.connectionSpecs[0]) + ", recoveryAddresses=" + this.recoveryAddresses + '}';
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof AbstractFlumeInputOperator)) {
            return false;
        }
        AbstractFlumeInputOperator that = (AbstractFlumeInputOperator)o;
        if (!Arrays.equals(this.connectionSpecs, that.connectionSpecs)) {
            return false;
        }
        return this.recoveryAddresses.equals(that.recoveryAddresses);
    }

    public int hashCode() {
        int result = this.connectionSpecs != null ? Arrays.hashCode(this.connectionSpecs) : 0;
        result = 31 * result + this.recoveryAddresses.hashCode();
        return result;
    }

    public void setMaxEventsPerSecond(long maxEventsPerSecond) {
        this.maxEventsPerSecond = maxEventsPerSecond;
    }

    public long getMaxEventsPerSecond() {
        return this.maxEventsPerSecond;
    }

    public static class ConnectionStatus
    implements Serializable {
        int id;
        String spec;
        boolean connected;
        private static final long serialVersionUID = 201312261615L;

        public int hashCode() {
            return this.spec.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            ConnectionStatus other = (ConnectionStatus)obj;
            return this.spec == null ? other.spec == null : this.spec.equals(other.spec);
        }

        public String toString() {
            return "ConnectionStatus{id=" + this.id + ", spec=" + this.spec + ", connected=" + this.connected + '}';
        }
    }

    public static class ZKStatsListner
    extends ZKAssistedDiscovery
    implements StatsListener,
    Serializable {
        long intervalMillis = 60000L;
        private final StatsListener.Response response = new StatsListener.Response();
        private transient long nextMillis;
        private static final long serialVersionUID = 201312241646L;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) {
            HashMap map = (HashMap)partitionedInstanceStatus.get();
            this.response.repartitionRequired = false;
            Object lastStat = null;
            List lastWindowedStats = stats.getLastWindowedStats();
            for (Stats.OperatorStats os : lastWindowedStats) {
                if (os.counters == null) continue;
                lastStat = os.counters;
                logger.debug("Received custom stats = {}", lastStat);
            }
            if (lastStat instanceof ConnectionStatus) {
                ConnectionStatus cs = (ConnectionStatus)lastStat;
                map.put(stats.getOperatorId(), cs);
                if (!cs.connected) {
                    logger.debug("setting repatitioned = true because of lastStat = {}", lastStat);
                    this.response.repartitionRequired = true;
                }
            }
            if (System.currentTimeMillis() < this.nextMillis) return this.response;
            logger.debug("nextMillis = {}", (Object)this.nextMillis);
            try {
                Collection<Discovery.Service<byte[]>> addresses;
                super.setup(null);
                try {
                    addresses = this.discover();
                }
                finally {
                    super.teardown();
                }
                discoveredFlumeSinks.set(addresses);
                logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", (Object)map, addresses);
                switch (addresses.size()) {
                    case 0: {
                        this.response.repartitionRequired = map.size() != 1;
                        return this.response;
                    }
                    default: {
                        if (addresses.size() == map.size()) {
                            for (ConnectionStatus value : map.values()) {
                                if (value != null && value.connected) continue;
                                this.response.repartitionRequired = true;
                                return this.response;
                            }
                            return this.response;
                        }
                        this.response.repartitionRequired = true;
                        return this.response;
                    }
                }
            }
            catch (Error er) {
                throw er;
            }
            catch (Throwable cause) {
                logger.warn("Unable to discover services, using values from last successful discovery", cause);
                return this.response;
            }
            finally {
                this.nextMillis = System.currentTimeMillis() + this.intervalMillis;
                logger.debug("Proposed NextMillis = {}", (Object)this.nextMillis);
            }
        }

        public long getIntervalMillis() {
            return this.intervalMillis;
        }

        public void setIntervalMillis(long intervalMillis) {
            this.intervalMillis = intervalMillis;
        }
    }

    class Client
    extends AbstractLengthPrependerClient {
        private final String id;

        Client(String id) {
            this.id = id;
        }

        public void onMessage(byte[] buffer, int offset, int size) {
            try {
                AbstractFlumeInputOperator.this.handoverBuffer.put(new Slice(buffer, offset, size));
            }
            catch (InterruptedException ex) {
                this.handleException(ex, (EventLoop)AbstractFlumeInputOperator.this.eventloop);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void connected() {
            Context.OperatorContext ctx;
            byte[] address;
            super.connected();
            ArrayList arrayList = AbstractFlumeInputOperator.this.recoveryAddresses;
            synchronized (arrayList) {
                address = AbstractFlumeInputOperator.this.recoveryAddresses.size() > 0 ? ((RecoveryAddress)((AbstractFlumeInputOperator)AbstractFlumeInputOperator.this).recoveryAddresses.get((int)(((AbstractFlumeInputOperator)AbstractFlumeInputOperator.this).recoveryAddresses.size() - 1))).address : new byte[8];
            }
            int len = 17;
            byte[] array = new byte[len];
            array[0] = Server.Command.SEEK.getOrdinal();
            System.arraycopy(address, 0, array, 1, 8);
            Server.writeLong(array, 9, System.currentTimeMillis());
            this.write(array);
            AbstractFlumeInputOperator.this.connected = true;
            ConnectionStatus connectionStatus = new ConnectionStatus();
            connectionStatus.connected = true;
            connectionStatus.spec = AbstractFlumeInputOperator.this.connectionSpecs[0];
            Context.OperatorContext operatorContext = ctx = AbstractFlumeInputOperator.this.context;
            synchronized (operatorContext) {
                logger.debug("{} Submitting ConnectionStatus = {}", (Object)AbstractFlumeInputOperator.this, (Object)connectionStatus);
                AbstractFlumeInputOperator.this.context.setCounters((Object)connectionStatus);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void disconnected() {
            Context.OperatorContext ctx;
            AbstractFlumeInputOperator.this.connected = false;
            ConnectionStatus connectionStatus = new ConnectionStatus();
            connectionStatus.connected = false;
            connectionStatus.spec = AbstractFlumeInputOperator.this.connectionSpecs[0];
            Context.OperatorContext operatorContext = ctx = AbstractFlumeInputOperator.this.context;
            synchronized (operatorContext) {
                logger.debug("{} Submitting ConnectionStatus = {}", (Object)AbstractFlumeInputOperator.this, (Object)connectionStatus);
                AbstractFlumeInputOperator.this.context.setCounters((Object)connectionStatus);
            }
            super.disconnected();
        }
    }

    private static class RecoveryAddress
    implements Serializable {
        long windowId;
        byte[] address;
        private static final long serialVersionUID = 201312021432L;

        private RecoveryAddress() {
        }

        public String toString() {
            return "RecoveryAddress{windowId=" + this.windowId + ", address=" + Arrays.toString(this.address) + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof RecoveryAddress)) {
                return false;
            }
            RecoveryAddress that = (RecoveryAddress)o;
            if (this.windowId != that.windowId) {
                return false;
            }
            return Arrays.equals(this.address, that.address);
        }

        public int hashCode() {
            int result = (int)(this.windowId ^ this.windowId >>> 32);
            result = 31 * result + (this.address != null ? Arrays.hashCode(this.address) : 0);
            return result;
        }
    }
}

