/*
 * Decompiled with CFR 0.152.
 */
package io.kk.vertx.kafka.relay;

import io.kk.vertx.kafka.relay.BaseVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer
extends BaseVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
    public static final String RELAY_PRODUCER_TOPICS_UPDATE = "relay.producer.topics.update";
    private KafkaProducer<String, String> producer;

    public Producer(List<String> bServers) {
        super(bServers);
    }

    public void start(Future<Void> startFuture) throws Exception {
        List addrs = this.config().getJsonArray("addresses", new JsonArray()).getList();
        this.addTopics(addrs);
        this.vertx.eventBus().localConsumer(RELAY_PRODUCER_TOPICS_UPDATE, e -> {
            this.addTopics(((JsonArray)e.body()).getList());
            e.reply((Object)"ok");
        });
        try {
            this.setProducer((KafkaProducer<String, String>)new KafkaProducer(this.updateConfig(this.loadConfig(this.config()))));
        }
        catch (ConfigException e2) {
            startFuture.fail((Throwable)e2);
            return;
        }
        super.start(startFuture);
    }

    public void stop(Future<Void> stopFuture) throws Exception {
        this.producer.close();
        super.stop(stopFuture);
    }

    private void addTopics(List<String> addrs) {
        addrs.forEach(add -> {
            if (!this.addresses.contains(add)) {
                this.vertx.eventBus().consumer(add, this::sendToKafka);
                this.addresses.add(add);
            }
        });
        LOG.debug("Addresses added - {}", (Object)this.addresses);
    }

    private <T> void sendToKafka(Message<T> message) {
        LOG.debug("Receive a Vertx message: address={}", (Object)message.address());
        JsonObject keyObj = new JsonObject();
        message.headers().forEach(e -> {
            JsonArray ary = keyObj.getJsonArray((String)e.getKey());
            if (ary == null) {
                ary = new JsonArray();
                keyObj.put((String)e.getKey(), ary);
            }
            ary.add((String)e.getValue());
        });
        ProducerRecord record = new ProducerRecord(message.address() + ".kafka", (Object)keyObj.toString(), (Object)message.body().toString());
        this.producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                LOG.error("Unable to send message :{}", message.body());
                LOG.error("Error", (Throwable)exception);
            }
            LOG.debug("Sent a message {}", (Object)metadata);
        });
    }

    void setProducer(KafkaProducer<String, String> producer) {
        this.producer = Objects.requireNonNull(producer);
    }
}

