/*
 * Decompiled with CFR 0.152.
 */
package io.kk.vertx.kafka.relay;

import io.kk.vertx.kafka.relay.BaseVerticle;
import io.kk.vertx.kafka.relay.VertxMessageFactory;
import io.vertx.core.Future;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer
extends BaseVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
    private static final String RELAY_CONSUMER_TOPICS_UPDATE = "relay.consumer.topics.update";
    private VertxMessageFactory<?> factory;
    private ExecutorService executors;
    private List<String> topics;
    private KafkaConsumer<String, String> consumer;
    private AtomicBoolean consuming = new AtomicBoolean(true);

    public Consumer(List<String> bServers) {
        super(bServers);
    }

    public void start(Future<Void> startFuture) throws Exception {
        LOG.debug("Topics {}", (Object)this.addresses);
        this.factory = (VertxMessageFactory)Class.forName(this.config().getString("messageFactory")).newInstance();
        this.executors = Executors.newSingleThreadExecutor();
        this.addresses = this.config().getJsonArray("vertx.addresses", new JsonArray()).getList();
        if (this.addresses.isEmpty()) {
            this.vertx.eventBus().localConsumer(RELAY_CONSUMER_TOPICS_UPDATE, e -> {
                this.addresses.addAll(((JsonArray)e.body()).getList());
                this.submitConsumerJob();
            });
        } else {
            this.submitConsumerJob();
        }
        super.start(startFuture);
    }

    public void stop(Future<Void> stopFuture) throws Exception {
        this.consuming.set(false);
        this.executors.shutdown();
        this.consumer.close();
        super.stop(stopFuture);
    }

    private void submitConsumerJob() {
        this.topics = this.addresses.stream().map(s -> s + ".kafka").collect(Collectors.toList());
        this.executors.submit(() -> {
            try {
                this.consumer = new KafkaConsumer(this.updateConfig(this.loadConfig(this.config())));
                this.startConsume(this.topics);
            }
            catch (IOException ex) {
                LOG.error("Unable to start Consumer", (Throwable)ex);
                throw new RuntimeException(ex);
            }
        });
    }

    private void startConsume(List<String> topics) {
        LOG.debug("To subscript topics : {}", topics);
        this.consumer.subscribe(topics);
        long interval = this.config().getLong("interval");
        long pullTimeout = this.config().getLong("pullTimeout");
        LOG.debug("To run consumers with pulltimeout={}, interval={}", (Object)pullTimeout, (Object)interval);
        while (this.consuming.get()) {
            ConsumerRecords records = this.consumer.poll(pullTimeout);
            if (records != null) {
                records.forEach(record -> {
                    String topic = record.topic();
                    String address = StringUtils.remove((String)topic, (String)".kafka");
                    LOG.debug("Relaying a kafka message: {}, publish to vertx address: {}", (Object)topic, (Object)address);
                    DeliveryOptions deliveryOptions = new DeliveryOptions();
                    String key = (String)record.key();
                    if (StringUtils.isNotBlank((CharSequence)key)) {
                        try {
                            JsonObject keyObj = new JsonObject(key);
                            keyObj.forEach(keyEntry -> {
                                String value = (String)keyEntry.getKey();
                                JsonArray ary = (JsonArray)keyEntry.getValue();
                                ary.forEach(item -> deliveryOptions.addHeader(value, item.toString()));
                            });
                        }
                        catch (DecodeException e) {
                            LOG.warn("Unable to decode string [" + key + "] to JSON");
                        }
                    }
                    this.vertx.eventBus().publish(address, this.factory.message((String)record.value()), deliveryOptions);
                });
            }
            try {
                Thread.sleep(interval);
            }
            catch (InterruptedException e) {
                LOG.warn("Sleep was interrupted", (Throwable)e);
            }
        }
        this.consumer.close();
    }

    @Override
    protected Map<String, Object> updateConfig(Map<String, Object> config) {
        String group = System.getProperty("CONSUMER_GROUP_ID");
        if (StringUtils.isNotBlank((CharSequence)group)) {
            config.put("group.id", group);
            LOG.info("Consumer group.id: {}", (Object)group);
        }
        return super.updateConfig(config);
    }
}

