/*
 * Decompiled with CFR 0.152.
 */
package io.thill.kafkalite.client;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuedKafkaConsumer<K, V>
implements AutoCloseable {
    private static final Duration KAFKA_POLL_DURATION = Duration.ofMillis(100L);
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final AtomicBoolean keepRunning = new AtomicBoolean(true);
    private final CountDownLatch closedLatch = new CountDownLatch(1);
    private final BlockingQueue<ConsumerRecord<K, V>> queue = new LinkedBlockingQueue<ConsumerRecord<K, V>>();
    private final TopicPartition topicPartition;
    private final KafkaConsumer<K, V> consumer;

    public QueuedKafkaConsumer(TopicPartition topicPartition, Properties properties) {
        this.topicPartition = topicPartition;
        this.consumer = new KafkaConsumer(properties);
        this.consumer.assign(Arrays.asList(topicPartition));
        this.consumer.seekToBeginning(Arrays.asList(topicPartition));
        new Thread(this::run).start();
    }

    public void seekToBeginning() {
        this.consumer.seekToEnd(Arrays.asList(this.topicPartition));
    }

    public void seekToEnd() {
        this.consumer.seekToEnd(Arrays.asList(this.topicPartition));
    }

    private void run() {
        try {
            while (this.keepRunning.get()) {
                for (ConsumerRecord record : this.consumer.poll(KAFKA_POLL_DURATION)) {
                    this.queue.add(record);
                }
            }
        }
        finally {
            this.closedLatch.countDown();
        }
    }

    public ConsumerRecord<K, V> poll() {
        return this.poll(1L, TimeUnit.SECONDS);
    }

    public ConsumerRecord<K, V> poll(long timeoutMillis) {
        return this.poll(timeoutMillis, TimeUnit.MILLISECONDS);
    }

    public ConsumerRecord<K, V> poll(long timeout, TimeUnit unit) {
        try {
            return this.queue.poll(timeout, unit);
        }
        catch (InterruptedException e) {
            return null;
        }
    }

    public boolean isEmpty() {
        return this.isEmpty(0L);
    }

    public boolean isEmpty(long waitMillis) {
        try {
            Thread.sleep(waitMillis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return this.queue.isEmpty();
    }

    @Override
    public void close() {
        this.keepRunning.set(false);
        try {
            this.closedLatch.await();
        }
        catch (InterruptedException e) {
            this.logger.warn("Interrupted", (Throwable)e);
        }
    }
}

