/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kafka;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.kafka.AbstractKafkaPartitioner;
import org.apache.apex.malhar.kafka.KafkaConsumerWrapper;
import org.apache.apex.malhar.kafka.KafkaMetrics;
import org.apache.apex.malhar.kafka.OneToManyPartitioner;
import org.apache.apex.malhar.kafka.OneToOnePartitioner;
import org.apache.apex.malhar.kafka.PartitionStrategy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractKafkaInputOperator
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
Operator.CheckpointListener,
Partitioner<AbstractKafkaInputOperator>,
StatsListener,
OffsetCommitCallback {
    private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
    @NotNull
    private String[] clusters;
    @NotNull
    private String[] topics;
    private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<AbstractKafkaPartitioner.PartitionMeta, Long>();
    private int initialPartitionCount = 1;
    private long repartitionInterval = 30000L;
    private long repartitionCheckInterval = 5000L;
    @Min(value=1L)
    private int maxTuplesPerWindow = Integer.MAX_VALUE;
    private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST;
    private long metricsRefreshInterval = 5000L;
    private long consumerTimeout = 5000L;
    private int holdingBufferSize = 1024;
    private Properties consumerProps;
    private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
    private final transient KafkaConsumerWrapper consumerWrapper = new KafkaConsumerWrapper();
    private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
    private transient int emitCount = 0;
    private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new LinkedList<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>>();
    private transient String applicationName;
    private transient AbstractKafkaPartitioner partitioner;
    private transient long currentWindowId;
    private transient long lastCheckTime = 0L;
    private transient long lastRepartitionTime = 0L;
    @AutoMetric
    private transient KafkaMetrics metrics;

    public void activate(Context.OperatorContext context) {
        this.consumerWrapper.start();
    }

    public void deactivate() {
        this.consumerWrapper.stop();
    }

    public void checkpointed(long l) {
    }

    public void committed(long windowId) {
        if (this.initialOffset == InitialOffset.LATEST || this.initialOffset == InitialOffset.EARLIEST) {
            return;
        }
        Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = this.offsetHistory.iterator();
        while (iter.hasNext()) {
            Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
            if ((Long)item.getLeft() > windowId) continue;
            if ((Long)item.getLeft() == windowId) {
                this.consumerWrapper.commitOffsets((Map)item.getRight());
            }
            iter.remove();
        }
    }

    public void emitTuples() {
        int count = this.consumerWrapper.messageSize();
        if (this.maxTuplesPerWindow > 0) {
            count = Math.min(count, this.maxTuplesPerWindow - this.emitCount);
        }
        for (int i = 0; i < count; ++i) {
            Pair<String, ConsumerRecord<byte[], byte[]>> tuple = this.consumerWrapper.pollMessage();
            ConsumerRecord msg = (ConsumerRecord)tuple.getRight();
            this.emitTuple((String)tuple.getLeft(), (ConsumerRecord<byte[], byte[]>)msg);
            AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta((String)tuple.getLeft(), msg.topic(), msg.partition());
            this.offsetTrack.put(pm, msg.offset() + 1L);
        }
        this.emitCount += count;
    }

    protected abstract void emitTuple(String var1, ConsumerRecord<byte[], byte[]> var2);

    public void beginWindow(long wid) {
        this.emitCount = 0;
        this.currentWindowId = wid;
    }

    public void endWindow() {
        HashMap<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow = new HashMap<AbstractKafkaPartitioner.PartitionMeta, Long>(this.offsetTrack);
        this.offsetHistory.add((Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>)Pair.of((Object)this.currentWindowId, offsetsWithWindow));
        this.metrics.updateMetrics(this.clusters, this.consumerWrapper.getAllConsumerMetrics());
    }

    public void setup(Context.OperatorContext context) {
        this.applicationName = (String)context.getValue(Context.DAGContext.APPLICATION_NAME);
        this.consumerWrapper.create(this);
        this.metrics = new KafkaMetrics(this.metricsRefreshInterval);
    }

    public void teardown() {
    }

    private void initPartitioner() {
        if (this.partitioner == null) {
            logger.info("Initialize Partitioner");
            switch (this.strategy) {
                case ONE_TO_ONE: {
                    this.partitioner = new OneToOnePartitioner(this.clusters, this.topics, this);
                    break;
                }
                case ONE_TO_MANY: {
                    this.partitioner = new OneToManyPartitioner(this.clusters, this.topics, this);
                    break;
                }
                case ONE_TO_MANY_HEURISTIC: {
                    throw new UnsupportedOperationException("Not implemented yet");
                }
                default: {
                    throw new RuntimeException("Invalid strategy");
                }
            }
            logger.info("Actual Partitioner is {}", this.partitioner.getClass());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
        long t = System.currentTimeMillis();
        if (this.repartitionInterval < 0L || this.repartitionCheckInterval < 0L || t - this.lastCheckTime < this.repartitionCheckInterval || t - this.lastRepartitionTime < this.repartitionInterval) {
            StatsListener.Response response = new StatsListener.Response();
            response.repartitionRequired = false;
            return response;
        }
        try {
            logger.debug("Process stats");
            this.initPartitioner();
            StatsListener.Response response = this.partitioner.processStats(batchedOperatorStats);
            return response;
        }
        finally {
            this.lastCheckTime = System.currentTimeMillis();
        }
    }

    public Collection<Partitioner.Partition<AbstractKafkaInputOperator>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
        logger.debug("Define partitions");
        this.initPartitioner();
        return this.partitioner.definePartitions(collection, partitioningContext);
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator>> map) {
        this.lastRepartitionTime = System.currentTimeMillis();
        this.initPartitioner();
        this.partitioner.partitioned(map);
    }

    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
        if (logger.isDebugEnabled()) {
            logger.debug("Commit offsets complete {} ", (Object)Joiner.on((char)';').withKeyValueSeparator("=").join(map));
        }
        if (e != null) {
            logger.warn("Exceptions in committing offsets {} : {} ", (Object)Joiner.on((char)';').withKeyValueSeparator("=").join(map), (Object)e);
        }
    }

    public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment) {
        this.assignment = assignment;
    }

    public Set<AbstractKafkaPartitioner.PartitionMeta> assignment() {
        return this.assignment;
    }

    public void setInitialPartitionCount(int partitionCount) {
        this.initialPartitionCount = partitionCount;
    }

    public int getInitialPartitionCount() {
        return this.initialPartitionCount;
    }

    public void setClusters(String clusters) {
        this.clusters = clusters.split(";");
    }

    public String getClusters() {
        return Joiner.on((char)';').join((Object[])this.clusters);
    }

    public void setTopics(String topics) {
        this.topics = (String[])Iterables.toArray((Iterable)Splitter.on((char)',').trimResults().omitEmptyStrings().split((CharSequence)topics), String.class);
    }

    public String getTopics() {
        return Joiner.on((String)", ").join((Object[])this.topics);
    }

    public void setStrategy(String policy) {
        this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
    }

    public String getStrategy() {
        return this.strategy.name();
    }

    public void setInitialOffset(String initialOffset) {
        this.initialOffset = InitialOffset.valueOf(initialOffset.toUpperCase());
    }

    public String getInitialOffset() {
        return this.initialOffset.name();
    }

    public String getApplicationName() {
        return this.applicationName;
    }

    public void setConsumerProps(Properties consumerProps) {
        this.consumerProps = consumerProps;
    }

    public Properties getConsumerProps() {
        return this.consumerProps;
    }

    public void setMaxTuplesPerWindow(int maxTuplesPerWindow) {
        this.maxTuplesPerWindow = maxTuplesPerWindow;
    }

    public int getMaxTuplesPerWindow() {
        return this.maxTuplesPerWindow;
    }

    public long getConsumerTimeout() {
        return this.consumerTimeout;
    }

    public void setConsumerTimeout(long consumerTimeout) {
        this.consumerTimeout = consumerTimeout;
    }

    public int getHoldingBufferSize() {
        return this.holdingBufferSize;
    }

    public void setHoldingBufferSize(int holdingBufferSize) {
        this.holdingBufferSize = holdingBufferSize;
    }

    public long getMetricsRefreshInterval() {
        return this.metricsRefreshInterval;
    }

    public void setMetricsRefreshInterval(long metricsRefreshInterval) {
        this.metricsRefreshInterval = metricsRefreshInterval;
    }

    public void setRepartitionCheckInterval(long repartitionCheckInterval) {
        this.repartitionCheckInterval = repartitionCheckInterval;
    }

    public long getRepartitionCheckInterval() {
        return this.repartitionCheckInterval;
    }

    public void setRepartitionInterval(long repartitionInterval) {
        this.repartitionInterval = repartitionInterval;
    }

    public long getRepartitionInterval() {
        return this.repartitionInterval;
    }

    public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack() {
        return this.offsetTrack;
    }

    public static enum InitialOffset {
        EARLIEST,
        LATEST,
        APPLICATION_OR_EARLIEST,
        APPLICATION_OR_LATEST;

    }
}

