/*
 * Decompiled with CFR 0.152.
 */
package cascading.local.tap.kafka;

import cascading.flow.FlowProcess;
import cascading.local.tap.kafka.KafkaScheme;
import cascading.local.tap.kafka.decorator.ForwardingConsumer;
import cascading.property.PropertyUtil;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeCollector;
import cascading.tuple.TupleEntrySchemeIterator;
import cascading.util.CloseableIterator;
import cascading.util.Util;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTap<K, V>
extends Tap<Properties, Iterator<ConsumerRecord<K, V>>, Producer<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTap.class);
    public static final Properties CONSUME_AUTO_COMMIT_LATEST = new Properties(){
        {
            this.setProperty("enable.auto.commit", "true");
            this.setProperty("auto.commit.interval.ms", "1000");
            this.setProperty("auto.offset.reset", "latest");
        }
    };
    public static final Properties CONSUME_AUTO_COMMIT_EARLIEST = new Properties(){
        {
            this.setProperty("enable.auto.commit", "true");
            this.setProperty("auto.commit.interval.ms", "1000");
            this.setProperty("auto.offset.reset", "earliest");
        }
    };
    public static final Properties PRODUCE_ACK_ALL_NO_RETRY = new Properties(){
        {
            this.setProperty("acks", "all");
            this.setProperty("retries", "0");
        }
    };
    public static final long DEFAULT_POLL_TIMEOUT = 10000L;
    public static final short DEFAULT_REPLICATION_FACTOR = 1;
    public static final int DEFAULT_NUM_PARTITIONS = 1;
    Properties defaultProperties = PropertyUtil.merge((Properties[])new Properties[]{CONSUME_AUTO_COMMIT_EARLIEST, PRODUCE_ACK_ALL_NO_RETRY});
    String hostname;
    String[] topics;
    boolean isTopicPattern = false;
    int numPartitions = 1;
    short replicationFactor = 1;
    String clientID = null;
    String groupID = Tap.id((Tap)this);
    long pollTimeout = 10000L;

    public static URI makeURI(String hostname, String ... topics) {
        if (hostname == null) {
            throw new IllegalArgumentException("hostname may not be null");
        }
        Arrays.sort(topics);
        try {
            return new URI("kafka", hostname, "/" + Util.join((String)",", (String[])topics), null, null);
        }
        catch (URISyntaxException exception) {
            throw new IllegalArgumentException(exception.getMessage(), exception);
        }
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier) {
        this(defaultProperties, scheme, identifier, 10000L, 1, 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, long pollTimeout) {
        this(scheme, identifier, pollTimeout, 1, 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, int numPartitions, short replicationFactor) {
        this(scheme, identifier, 10000L, numPartitions, replicationFactor);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, long pollTimeout, int numPartitions, short replicationFactor) {
        this(null, scheme, identifier, pollTimeout, numPartitions, replicationFactor);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier) {
        this(scheme, identifier, 10000L, 1, 1);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, long pollTimeout) {
        this(defaultProperties, scheme, identifier, pollTimeout, 1, 1);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, int numPartitions, short replicationFactor) {
        this(defaultProperties, scheme, identifier, 10000L, numPartitions, replicationFactor);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, long pollTimeout, int numPartitions, short replicationFactor) {
        this(defaultProperties, scheme, identifier, null, pollTimeout, numPartitions, replicationFactor);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID) {
        this(defaultProperties, scheme, identifier, clientID, 10000L, 1, 1);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, String groupID) {
        this(defaultProperties, scheme, identifier, clientID, groupID, 10000L, 1, 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, long pollTimeout) {
        this(scheme, identifier, clientID, pollTimeout, 1, 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, int numPartitions, short replicationFactor) {
        this(scheme, identifier, clientID, 10000L, numPartitions, replicationFactor);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, long pollTimeout, int numPartitions, short replicationFactor) {
        this(null, scheme, identifier, clientID, pollTimeout, numPartitions, replicationFactor);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID) {
        this(scheme, identifier, clientID, 10000L, 1, 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, String groupID) {
        this(null, scheme, identifier, clientID, groupID, 10000L, 1, 1);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, long pollTimeout) {
        this(defaultProperties, scheme, identifier, clientID, pollTimeout, 1, 1);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, int numPartitions, short replicationFactor) {
        this(defaultProperties, scheme, identifier, clientID, 10000L, numPartitions, replicationFactor);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, long pollTimeout, int numPartitions, short replicationFactor) {
        this(defaultProperties, scheme, identifier, clientID, null, pollTimeout, numPartitions, replicationFactor);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, URI identifier, String clientID, String groupID, long pollTimeout, int numPartitions, short replicationFactor) {
        super(scheme, SinkMode.UPDATE);
        if (defaultProperties != null) {
            this.defaultProperties = new Properties(defaultProperties);
        }
        if (identifier == null) {
            throw new IllegalArgumentException("identifier may not be null");
        }
        if (!identifier.getScheme().equalsIgnoreCase("kafka")) {
            throw new IllegalArgumentException("identifier does not have kafka scheme");
        }
        this.hostname = identifier.getHost();
        if (identifier.getPort() != -1) {
            this.hostname = this.hostname + ":" + identifier.getPort();
        }
        if (identifier.getQuery() == null) {
            throw new IllegalArgumentException("must have at least one topic in the query part of the URI");
        }
        if (clientID != null) {
            this.clientID = clientID;
        }
        if (groupID != null) {
            this.groupID = groupID;
        }
        this.pollTimeout = pollTimeout;
        this.numPartitions = numPartitions;
        this.replicationFactor = replicationFactor;
        this.applyTopics(identifier.getQuery().split(","));
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, String hostname, long pollTimeout, String ... topics) {
        this(scheme, hostname, pollTimeout, 1, 1, topics);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, String hostname, long pollTimeout, int numPartitions, short replicationFactor, String ... topics) {
        this(null, scheme, hostname, pollTimeout, numPartitions, replicationFactor, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, int numPartitions, short replicationFactor, String ... topics) {
        this(defaultProperties, scheme, hostname, 10000L, numPartitions, replicationFactor, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, String ... topics) {
        this(defaultProperties, scheme, hostname, 10000L, 1, 1, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, long pollTimeout, String ... topics) {
        this(defaultProperties, scheme, hostname, pollTimeout, 1, 1, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, long pollTimeout, int numPartitions, short replicationFactor, String ... topics) {
        this(defaultProperties, scheme, hostname, null, pollTimeout, numPartitions, replicationFactor, topics);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, String ... topics) {
        this(scheme, hostname, clientID, 10000L, 1, 1, topics);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, long pollTimeout, String ... topics) {
        this(scheme, hostname, clientID, pollTimeout, 1, 1, topics);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, long pollTimeout, int numPartitions, short replicationFactor, String ... topics) {
        this(null, scheme, hostname, clientID, pollTimeout, numPartitions, replicationFactor, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, int numPartitions, short replicationFactor, String ... topics) {
        this(defaultProperties, scheme, hostname, clientID, 10000L, numPartitions, replicationFactor, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, String ... topics) {
        this(defaultProperties, scheme, hostname, clientID, 10000L, 1, 1, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, long pollTimeout, String ... topics) {
        this(defaultProperties, scheme, hostname, clientID, pollTimeout, 1, 1, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, long pollTimeout, int numPartitions, short replicationFactor, String ... topics) {
        this(defaultProperties, scheme, hostname, clientID, null, pollTimeout, numPartitions, replicationFactor, topics);
    }

    public KafkaTap(Properties defaultProperties, KafkaScheme<K, V, ?, ?> scheme, String hostname, String clientID, String groupID, long pollTimeout, int numPartitions, short replicationFactor, String ... topics) {
        super(scheme, SinkMode.UPDATE);
        if (defaultProperties != null) {
            this.defaultProperties = new Properties(defaultProperties);
        }
        this.hostname = hostname;
        if (clientID != null) {
            this.clientID = clientID;
        }
        if (groupID != null) {
            this.groupID = groupID;
        }
        this.pollTimeout = pollTimeout;
        this.numPartitions = numPartitions;
        this.replicationFactor = replicationFactor;
        this.applyTopics(topics);
    }

    protected void applyTopics(String[] topics) {
        if (topics[0].matches("^/([^/]|//)*/$")) {
            this.topics = new String[]{topics[0].substring(1, topics[0].length() - 1)};
            this.isTopicPattern = true;
        } else {
            this.topics = new String[topics.length];
            System.arraycopy(topics, 0, this.topics, 0, topics.length);
        }
    }

    public String getHostname() {
        return this.hostname;
    }

    public String getClientID() {
        return this.clientID;
    }

    public String getGroupID() {
        return this.groupID;
    }

    public String[] getTopics() {
        return this.topics;
    }

    public boolean isTopicPattern() {
        return this.isTopicPattern;
    }

    public String getIdentifier() {
        return KafkaTap.makeURI(this.hostname, this.topics).toString();
    }

    protected Consumer<K, V> createKafkaConsumer(Properties properties) {
        return new ForwardingConsumer(properties);
    }

    public TupleEntryIterator openForRead(FlowProcess<? extends Properties> flowProcess, Iterator<ConsumerRecord<K, V>> consumerRecord) throws IOException {
        Properties props = PropertyUtil.merge((Properties[])new Properties[]{(Properties)flowProcess.getConfig(), this.defaultProperties});
        props.setProperty("bootstrap.servers", this.hostname);
        Set<String> keys = props.stringPropertyNames();
        if (this.clientID != null && !keys.contains("client.id")) {
            props.setProperty("client.id", this.clientID);
        }
        if (!keys.contains("group.id")) {
            props.setProperty("group.id", this.groupID);
        }
        this.sourceConfInit(flowProcess, props);
        Properties consumerProperties = PropertyUtil.retain((Properties)props, (Set)ConsumerConfig.configNames());
        final Consumer<K, V> consumer = this.createKafkaConsumer(consumerProperties);
        this.preConsumerSubscribe(consumer);
        if (this.isTopicPattern) {
            consumer.subscribe(Pattern.compile(this.topics[0]), this.getConsumerRebalanceListener(consumer));
        } else {
            consumer.subscribe(Arrays.asList(this.getTopics()), this.getConsumerRebalanceListener(consumer));
        }
        this.postConsumerSubscribe(consumer);
        CloseableIterator iterator = new CloseableIterator<Iterator<ConsumerRecord<K, V>>>(){
            boolean completed = false;
            ConsumerRecords<K, V> records;

            public boolean hasNext() {
                if (this.records != null) {
                    return true;
                }
                if (this.completed) {
                    return false;
                }
                this.records = consumer.poll(KafkaTap.this.pollTimeout);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("kafka records polled: {}", (Object)this.records.count());
                }
                if (this.records.isEmpty()) {
                    this.completed = true;
                    this.records = null;
                }
                return this.records != null;
            }

            public Iterator<ConsumerRecord<K, V>> next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException("no more elements");
                }
                try {
                    final 4 parent = this;
                    CloseableIterator closeableIterator = new CloseableIterator<ConsumerRecord<K, V>>(){
                        Iterator<ConsumerRecord<K, V>> delegate;
                        Supplier<Boolean> hasNext;
                        {
                            this.delegate = records.iterator();
                            this.hasNext = () -> this.delegate.hasNext();
                        }

                        public void close() throws IOException {
                            this.hasNext = () -> false;
                            parent.close();
                        }

                        public boolean hasNext() {
                            return this.hasNext.get();
                        }

                        public ConsumerRecord<K, V> next() {
                            return this.delegate.next();
                        }
                    };
                    return closeableIterator;
                }
                finally {
                    this.records = null;
                }
            }

            public void close() {
                try {
                    consumer.close();
                }
                finally {
                    this.completed = true;
                }
            }
        };
        return new TupleEntrySchemeIterator(flowProcess, (Tap)this, this.getScheme(), iterator);
    }

    protected void preConsumerSubscribe(Consumer<K, V> consumer) {
    }

    protected void postConsumerSubscribe(Consumer<K, V> consumer) {
    }

    protected ConsumerRebalanceListener getConsumerRebalanceListener(Consumer<K, V> consumer) {
        return new NoOpConsumerRebalanceListener();
    }

    public TupleEntryCollector openForWrite(FlowProcess<? extends Properties> flowProcess, Producer<K, V> producer) throws IOException {
        Properties props = PropertyUtil.merge((Properties[])new Properties[]{(Properties)flowProcess.getConfig(), this.defaultProperties});
        props.setProperty("bootstrap.servers", this.hostname);
        this.sinkConfInit(flowProcess, props);
        producer = new KafkaProducer(PropertyUtil.retain((Properties)props, (Set)ProducerConfig.configNames()));
        return new TupleEntrySchemeCollector(flowProcess, (Tap)this, this.getScheme(), (Object)producer);
    }

    protected AdminClient createAdminClient(Properties conf) {
        Properties props = new Properties(conf);
        props.setProperty("bootstrap.servers", this.hostname);
        return AdminClient.create((Properties)props);
    }

    public boolean createResource(Properties conf) {
        AdminClient client = this.createAdminClient(conf);
        ArrayList<NewTopic> topics = new ArrayList<NewTopic>(this.getTopics().length);
        for (String topic : this.getTopics()) {
            topics.add(new NewTopic(topic, this.numPartitions, this.replicationFactor));
        }
        CreateTopicsResult result = client.createTopics(topics);
        KafkaFuture all = result.all();
        try {
            all.get();
        }
        catch (InterruptedException | ExecutionException exception) {
            LOG.info("unable to create topics");
            return false;
        }
        return true;
    }

    public boolean deleteResource(Properties conf) {
        AdminClient client = this.createAdminClient(conf);
        DeleteTopicsResult result = client.deleteTopics(Arrays.asList(this.getTopics()));
        KafkaFuture all = result.all();
        try {
            all.get();
        }
        catch (InterruptedException | ExecutionException exception) {
            LOG.info("unable to create topics");
            return false;
        }
        return true;
    }

    public boolean resourceExists(Properties conf) {
        AdminClient client = this.createAdminClient(conf);
        DescribeTopicsResult result = client.describeTopics(Arrays.asList(this.getTopics()));
        KafkaFuture all = result.all();
        try {
            Map map = (Map)all.get();
            return map.size() == this.getTopics().length;
        }
        catch (InterruptedException | ExecutionException exception) {
            LOG.info("unable to create topics");
            return false;
        }
    }

    public long getModifiedTime(Properties conf) throws IOException {
        if (this.resourceExists(conf)) {
            return Long.MAX_VALUE;
        }
        return 0L;
    }
}

