/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.flume.sink;

import com.datatorrent.api.Component;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.NetletThrowable;
import com.datatorrent.netlet.util.Slice;
import java.io.IOError;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.ServiceConfigurationError;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.apache.apex.malhar.flume.sink.Server;
import org.apache.apex.malhar.flume.storage.EventCodec;
import org.apache.apex.malhar.flume.storage.Storage;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlumeSink
extends AbstractSink
implements Configurable {
    private static final String HOSTNAME_STRING = "hostname";
    private static final String HOSTNAME_DEFAULT = "locahost";
    private static final long ACCEPTED_TOLERANCE = 20000L;
    private DefaultEventLoop eventloop;
    private Server server;
    private int outstandingEventsCount;
    private int lastConsumedEventsCount;
    private int idleCount;
    private byte[] playback;
    private Server.Client client;
    private String hostname;
    private int port;
    private String id;
    private long acceptedTolerance;
    private long sleepMillis;
    private double throughputAdjustmentFactor;
    private int minimumEventsPerTransaction;
    private int maximumEventsPerTransaction;
    private long commitEventTimeoutMillis;
    private transient long lastCommitEventTimeMillis;
    private Storage storage;
    Discovery<byte[]> discovery;
    StreamCodec<Event> codec;
    private static final Logger logger = LoggerFactory.getLogger(FlumeSink.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sink.Status process() throws EventDeliveryException {
        int maxTuples;
        ArrayList<Server.Request> arrayList = this.server.requests;
        synchronized (arrayList) {
            block25: for (Server.Request r : this.server.requests) {
                logger.debug("found {}", (Object)r);
                switch (r.type) {
                    case SEEK: {
                        this.lastCommitEventTimeMillis = System.currentTimeMillis();
                        Slice slice = r.getAddress();
                        this.playback = this.storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
                        this.client = r.client;
                        continue block25;
                    }
                    case COMMITTED: {
                        this.lastCommitEventTimeMillis = System.currentTimeMillis();
                        Slice slice = r.getAddress();
                        this.storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
                        continue block25;
                    }
                    case CONNECTED: {
                        logger.debug("Connected received, ignoring it!");
                        continue block25;
                    }
                    case DISCONNECTED: {
                        if (r.client != this.client) continue block25;
                        this.client = null;
                        this.outstandingEventsCount = 0;
                        continue block25;
                    }
                    case WINDOWED: {
                        this.lastConsumedEventsCount = r.getEventCount();
                        this.idleCount = r.getIdleCount();
                        this.outstandingEventsCount -= this.lastConsumedEventsCount;
                        continue block25;
                    }
                    case SERVER_ERROR: {
                        throw new IOError(null);
                    }
                }
                logger.debug("Cannot understand the request {}", (Object)r);
            }
            this.server.requests.clear();
        }
        if (this.client == null) {
            logger.info("No client expressed interest yet to consume the events.");
            return Sink.Status.BACKOFF;
        }
        if (System.currentTimeMillis() - this.lastCommitEventTimeMillis > this.commitEventTimeoutMillis) {
            logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.", (Object)(System.currentTimeMillis() - this.lastCommitEventTimeMillis));
            return Sink.Status.BACKOFF;
        }
        if (this.outstandingEventsCount < 0) {
            maxTuples = this.idleCount > 1 ? (int)((1.0 + this.throughputAdjustmentFactor * (double)this.idleCount) * (double)this.lastConsumedEventsCount) : (int)((1.0 + this.throughputAdjustmentFactor) * (double)this.lastConsumedEventsCount);
        } else if (this.outstandingEventsCount > this.lastConsumedEventsCount) {
            maxTuples = (int)((1.0 - this.throughputAdjustmentFactor) * (double)this.lastConsumedEventsCount);
        } else if (this.idleCount > 0) {
            maxTuples = (int)((1.0 + this.throughputAdjustmentFactor * (double)this.idleCount) * (double)this.lastConsumedEventsCount);
            if (maxTuples <= 0) {
                maxTuples = this.minimumEventsPerTransaction;
            }
        } else {
            maxTuples = this.lastConsumedEventsCount;
        }
        if (maxTuples >= this.maximumEventsPerTransaction) {
            maxTuples = this.maximumEventsPerTransaction;
        } else if (maxTuples <= 0) {
            maxTuples = this.minimumEventsPerTransaction;
        }
        if (maxTuples > 0) {
            int storedTuples;
            if (this.playback != null) {
                try {
                    int i = 0;
                    do {
                        if (!this.client.write(this.playback)) {
                            this.retryWrite(this.playback, null);
                        }
                        ++this.outstandingEventsCount;
                        this.playback = this.storage.retrieveNext();
                    } while (++i < maxTuples && this.playback != null);
                }
                catch (Exception ex) {
                    logger.warn("Playback Failed", (Throwable)ex);
                    if (ex instanceof NetletThrowable) {
                        try {
                            this.eventloop.disconnect((Listener.ClientListener)this.client);
                        }
                        finally {
                            this.client = null;
                            this.outstandingEventsCount = 0;
                        }
                    }
                    return Sink.Status.BACKOFF;
                }
            }
            try (Transaction t = this.getChannel().getTransaction();){
                Event e;
                t.begin();
                for (storedTuples = 0; storedTuples < maxTuples && (e = this.getChannel().take()) != null; ++storedTuples) {
                    Slice event = this.codec.toByteArray((Object)e);
                    byte[] address = this.storage.store(event);
                    if (address != null) {
                        if (!this.client.write(address, event)) {
                            this.retryWrite(address, event);
                        }
                        ++this.outstandingEventsCount;
                        continue;
                    }
                    logger.debug("Detected the condition of recovery from flume crash!");
                }
                if (storedTuples > 0) {
                    this.storage.flush();
                }
                t.commit();
                if (storedTuples > 0) {
                    logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}", new Object[]{maxTuples, storedTuples, this.outstandingEventsCount});
                }
            }
            if (storedTuples == 0) {
                this.sleep();
            }
        }
        return Sink.Status.READY;
    }

    private void sleep() {
        try {
            Thread.sleep(this.sleepMillis);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    public void start() {
        try {
            Component component;
            if (this.storage instanceof Component) {
                component = (Component)this.storage;
                component.setup(null);
            }
            if (this.discovery instanceof Component) {
                component = (Component)this.discovery;
                component.setup(null);
            }
            if (this.codec instanceof Component) {
                component = (Component)this.codec;
                component.setup(null);
            }
            this.eventloop = new DefaultEventLoop("EventLoop-" + this.id);
            this.server = new Server(this.id, this.discovery, this.acceptedTolerance);
        }
        catch (Error error) {
            throw error;
        }
        catch (RuntimeException re) {
            throw re;
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        this.eventloop.start();
        this.eventloop.start(this.hostname, this.port, (Listener.ServerListener)this.server);
        super.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        try {
            super.stop();
        }
        finally {
            try {
                Component component;
                if (this.client != null) {
                    this.eventloop.disconnect((Listener.ClientListener)this.client);
                    this.client = null;
                }
                this.eventloop.stop((Listener.ServerListener)this.server);
                this.eventloop.stop();
                if (this.codec instanceof Component) {
                    component = (Component)this.codec;
                    component.teardown();
                }
                if (this.discovery instanceof Component) {
                    component = (Component)this.discovery;
                    component.teardown();
                }
                if (this.storage instanceof Component) {
                    component = (Component)this.storage;
                    component.teardown();
                }
            }
            catch (Throwable cause) {
                throw new ServiceConfigurationError("Failed Stop", cause);
            }
        }
    }

    public void configure(Context context) {
        StreamCodec lCodec;
        this.hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT);
        this.port = context.getInteger("port", Integer.valueOf(0));
        this.id = context.getString("id");
        if (this.id == null) {
            this.id = this.getName();
        }
        this.acceptedTolerance = context.getLong("acceptedTolerance", Long.valueOf(20000L));
        this.sleepMillis = context.getLong("sleepMillis", Long.valueOf(5L));
        this.throughputAdjustmentFactor = (double)context.getInteger("throughputAdjustmentPercent", Integer.valueOf(5)).intValue() / 100.0;
        this.maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", Integer.valueOf(10000));
        this.minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", Integer.valueOf(100));
        this.commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.valueOf(Long.MAX_VALUE));
        Discovery ldiscovery = FlumeSink.configure("discovery", Discovery.class, context);
        if (ldiscovery == null) {
            logger.warn("Discovery agent not configured for the sink!");
            this.discovery = new Discovery<byte[]>(){

                @Override
                public void unadvertise(Discovery.Service<byte[]> service) {
                    logger.debug("Sink {} stopped listening on {}:{}", new Object[]{service.getId(), service.getHost(), service.getPort()});
                }

                @Override
                public void advertise(Discovery.Service<byte[]> service) {
                    logger.debug("Sink {} started listening on {}:{}", new Object[]{service.getId(), service.getHost(), service.getPort()});
                }

                @Override
                public Collection<Discovery.Service<byte[]>> discover() {
                    return Collections.EMPTY_SET;
                }
            };
        } else {
            this.discovery = ldiscovery;
        }
        this.storage = FlumeSink.configure("storage", Storage.class, context);
        if (this.storage == null) {
            logger.warn("storage key missing... FlumeSink may lose data!");
            this.storage = new Storage(){

                @Override
                public byte[] store(Slice slice) {
                    return null;
                }

                @Override
                public byte[] retrieve(byte[] identifier) {
                    return null;
                }

                @Override
                public byte[] retrieveNext() {
                    return null;
                }

                @Override
                public void clean(byte[] identifier) {
                }

                @Override
                public void flush() {
                }
            };
        }
        this.codec = (lCodec = FlumeSink.configure("codec", StreamCodec.class, context)) == null ? new EventCodec() : lCodec;
    }

    private static <T> T configure(String key, Class<T> clazz, Context context) {
        String classname = context.getString(key);
        if (classname == null) {
            return null;
        }
        try {
            Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname);
            if (clazz.isAssignableFrom(loadClass)) {
                Object object = loadClass.newInstance();
                if (object instanceof Configurable) {
                    Context context1 = new Context((Map)context.getSubProperties(key + '.'));
                    String id = context1.getString("id");
                    if (id == null) {
                        id = context.getString("id");
                        logger.debug("{} inherited id={} from sink", (Object)key, (Object)id);
                        context1.put("id", id);
                    }
                    ((Configurable)object).configure(context1);
                }
                return (T)object;
            }
            logger.error("key class {} does not implement {} interface", (Object)classname, (Object)Storage.class.getCanonicalName());
            throw new Error("Invalid storage " + classname);
        }
        catch (Error error) {
            throw error;
        }
        catch (RuntimeException re) {
            throw re;
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    String getHostname() {
        return this.hostname;
    }

    void setHostname(String hostname) {
        this.hostname = hostname;
    }

    int getPort() {
        return this.port;
    }

    public long getAcceptedTolerance() {
        return this.acceptedTolerance;
    }

    public void setAcceptedTolerance(long acceptedTolerance) {
        this.acceptedTolerance = acceptedTolerance;
    }

    void setPort(int port) {
        this.port = port;
    }

    Discovery<byte[]> getDiscovery() {
        return this.discovery;
    }

    void setDiscovery(Discovery<byte[]> discovery) {
        this.discovery = discovery;
    }

    private void retryWrite(byte[] address, Slice event) throws IOException {
        if (event == null) {
            while (this.client.isConnected()) {
                this.sleep();
                if (!this.client.write(address)) continue;
                return;
            }
        } else {
            while (this.client.isConnected()) {
                this.sleep();
                if (!this.client.write(address, event)) continue;
                return;
            }
        }
        throw new IOException("Client disconnected!");
    }
}

