/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kafka;

import com.google.common.base.Joiner;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.AbstractKafkaPartitioner;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class KafkaConsumerWrapper
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
    private boolean isAlive = false;
    private final Map<String, KafkaConsumer<byte[], byte[]>> consumers = new HashMap<String, KafkaConsumer<byte[], byte[]>>();
    private ArrayBlockingQueue<Pair<String, ConsumerRecord<byte[], byte[]>>> holdingBuffer;
    private AbstractKafkaInputOperator ownerOperator = null;
    private ExecutorService kafkaConsumerExecutor;
    private final Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>();

    public void commitOffsets(Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsInWindow) {
        if (offsetsInWindow == null) {
            return;
        }
        for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : offsetsInWindow.entrySet()) {
            String cluster = e.getKey().getCluster();
            Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMap = this.offsetsToCommit.get(cluster);
            if (topicPartitionOffsetMap == null) {
                logger.warn("committed offset map should be initialized by consumer thread!");
                continue;
            }
            topicPartitionOffsetMap.put(e.getKey().getTopicPartition(), new OffsetAndMetadata(e.getValue().longValue()));
        }
    }

    public void create(AbstractKafkaInputOperator ownerOperator) {
        this.holdingBuffer = new ArrayBlockingQueue(ownerOperator.getHoldingBufferSize());
        this.ownerOperator = ownerOperator;
        logger.info("Create consumer wrapper with holding buffer size: {} ", (Object)ownerOperator.getHoldingBufferSize());
        if (logger.isInfoEnabled()) {
            logger.info("Assignments are {} ", (Object)Joiner.on((char)'\n').join(ownerOperator.assignment()));
        }
    }

    public void start() {
        this.isAlive = true;
        this.kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build());
        HashMap<String, LinkedList<TopicPartition>> consumerAssignment = new HashMap<String, LinkedList<TopicPartition>>();
        Set<AbstractKafkaPartitioner.PartitionMeta> assignments = this.ownerOperator.assignment();
        for (AbstractKafkaPartitioner.PartitionMeta partitionMeta : assignments) {
            String cluster = partitionMeta.getCluster();
            LinkedList<TopicPartition> cAssignment = (LinkedList<TopicPartition>)consumerAssignment.get(cluster);
            if (cAssignment == null) {
                cAssignment = new LinkedList<TopicPartition>();
                consumerAssignment.put(cluster, cAssignment);
            }
            cAssignment.add(new TopicPartition(partitionMeta.getTopic(), partitionMeta.getPartitionId()));
        }
        Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = this.ownerOperator.getOffsetTrack();
        for (Map.Entry e : consumerAssignment.entrySet()) {
            Properties prop = new Properties();
            if (this.ownerOperator.getConsumerProps() != null) {
                prop.putAll((Map<?, ?>)this.ownerOperator.getConsumerProps());
            }
            prop.put("bootstrap.servers", e.getKey());
            prop.put("auto.offset.reset", "none");
            prop.put("enable.auto.commit", "false");
            prop.put("key.deserializer", ByteArrayDeserializer.class.getName());
            prop.put("value.deserializer", ByteArrayDeserializer.class.getName());
            AbstractKafkaInputOperator.InitialOffset initialOffset = AbstractKafkaInputOperator.InitialOffset.valueOf(this.ownerOperator.getInitialOffset());
            if (initialOffset == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST || initialOffset == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_LATEST) {
                prop.put("group.id", this.ownerOperator.getApplicationName() + "_Consumer");
            }
            KafkaConsumer kc = new KafkaConsumer(prop);
            kc.assign((List)e.getValue());
            if (logger.isInfoEnabled()) {
                logger.info("Create consumer with properties {} ", (Object)Joiner.on((String)";").withKeyValueSeparator("=").join((Map)prop));
                logger.info("Assign consumer to {}", (Object)Joiner.on((char)'#').join((Iterable)e.getValue()));
            }
            if (currentOffset != null && !currentOffset.isEmpty()) {
                for (TopicPartition tp : (List)e.getValue()) {
                    AbstractKafkaPartitioner.PartitionMeta partitionKey = new AbstractKafkaPartitioner.PartitionMeta((String)e.getKey(), tp.topic(), tp.partition());
                    if (!currentOffset.containsKey(partitionKey)) continue;
                    kc.seek(tp, currentOffset.get(partitionKey).longValue());
                }
            }
            this.consumers.put((String)e.getKey(), (KafkaConsumer<byte[], byte[]>)kc);
            this.kafkaConsumerExecutor.submit(new ConsumerThread((String)e.getKey(), (KafkaConsumer<byte[], byte[]>)kc, this));
        }
    }

    public void stop() {
        for (KafkaConsumer<byte[], byte[]> c : this.consumers.values()) {
            c.wakeup();
        }
        this.kafkaConsumerExecutor.shutdownNow();
        this.isAlive = false;
        this.holdingBuffer.clear();
        IOUtils.closeQuietly((Closeable)this);
    }

    public void teardown() {
        this.holdingBuffer.clear();
    }

    public boolean isAlive() {
        return this.isAlive;
    }

    public void setAlive(boolean isAlive) {
        this.isAlive = isAlive;
    }

    public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage() {
        return this.holdingBuffer.poll();
    }

    public int messageSize() {
        return this.holdingBuffer.size();
    }

    protected final void putMessage(Pair<String, ConsumerRecord<byte[], byte[]>> msg) throws InterruptedException {
        this.holdingBuffer.put(msg);
    }

    @Override
    public void close() throws IOException {
    }

    public Map<String, Map<MetricName, ? extends Metric>> getAllConsumerMetrics() {
        HashMap<String, Map<MetricName, ? extends Metric>> val = new HashMap<String, Map<MetricName, ? extends Metric>>();
        for (Map.Entry<String, KafkaConsumer<byte[], byte[]>> e : this.consumers.entrySet()) {
            val.put(e.getKey(), e.getValue().metrics());
        }
        return val;
    }

    static final class ConsumerThread
    implements Runnable {
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final String cluster;
        private final KafkaConsumerWrapper wrapper;
        private Map<TopicPartition, OffsetAndMetadata> offsetToCommit = null;

        public ConsumerThread(String cluster, KafkaConsumer<byte[], byte[]> consumer, KafkaConsumerWrapper wrapper) {
            this.cluster = cluster;
            this.consumer = consumer;
            this.wrapper = wrapper;
            this.offsetToCommit = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
            wrapper.offsetsToCommit.put(cluster, this.offsetToCommit);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                while (this.wrapper.isAlive) {
                    if (!this.offsetToCommit.isEmpty()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Commit offsets {}", (Object)Joiner.on((char)';').withKeyValueSeparator("=").join(this.offsetToCommit));
                        }
                        this.consumer.commitAsync(this.offsetToCommit, (OffsetCommitCallback)this.wrapper.ownerOperator);
                        this.offsetToCommit.clear();
                    }
                    try {
                        ConsumerRecords records = this.consumer.poll(this.wrapper.ownerOperator.getConsumerTimeout());
                        for (ConsumerRecord record : records) {
                            this.wrapper.putMessage((Pair<String, ConsumerRecord<byte[], byte[]>>)Pair.of((Object)this.cluster, (Object)record));
                        }
                    }
                    catch (NoOffsetForPartitionException e) {
                        AbstractKafkaInputOperator.InitialOffset io = AbstractKafkaInputOperator.InitialOffset.valueOf(this.wrapper.ownerOperator.getInitialOffset());
                        if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
                            this.consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0]));
                            continue;
                        }
                        this.consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0]));
                    }
                    catch (InterruptedException e) {
                        throw new IllegalStateException("Consumer thread is interrupted unexpectedly", e);
                        return;
                    }
                }
            }
            catch (WakeupException we) {
                logger.info("The consumer is being stopped");
                return;
            }
            finally {
                this.consumer.close();
            }
        }
    }
}

