/*
 * Decompiled with CFR 0.152.
 */
package io.kk.vertx.kafka.relay;

import io.kk.vertx.kafka.relay.Consumer;
import io.kk.vertx.kafka.relay.Producer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Verticle;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.RetryForever;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RelayService
extends AbstractVerticle {
    public static final String DEPLOY_RELAY = "deploy.relay";
    public static final String LAZY_DEPLOY = "lazyDeploy";
    private static final Logger LOG = LoggerFactory.getLogger(RelayService.class);
    private static final String CONSUMER = "consumer";
    private static final String PRODUCER = "producer";

    public void start(Future<Void> startFuture) throws Exception {
        if (this.config().getBoolean(LAZY_DEPLOY, Boolean.valueOf(false)).booleanValue()) {
            this.vertx.eventBus().consumer(DEPLOY_RELAY, message -> this.doDeploy(startFuture));
        } else {
            this.doDeploy(startFuture);
        }
    }

    public void stop(Future<Void> stopFuture) throws Exception {
        super.stop(stopFuture);
    }

    private <T> void doDeploy(Future<Void> startFuture) {
        ArrayList fs = new ArrayList();
        Future<List<String>> future = this.getBootServers();
        future.setHandler(h -> {
            if (h.succeeded()) {
                List bServers = (List)h.result();
                this.deployInner(fs, bServers, startFuture);
            } else {
                startFuture.fail(h.cause());
            }
        });
    }

    private void deployInner(List<Future> fs, List<String> bServers, Future<Void> startFuture) {
        LOG.debug("Bootstrap servers are {}", bServers);
        fs.add(this.deploy((Verticle)new Consumer(bServers), CONSUMER));
        fs.add(this.deploy((Verticle)new Producer(bServers), PRODUCER));
        CompositeFuture.all(fs).setHandler(r -> {
            if (r.succeeded()) {
                startFuture.complete();
                LOG.info("Relay were deployed");
            } else {
                startFuture.fail(r.cause());
            }
        });
    }

    private Future<String> deploy(Verticle verticle, String name) {
        Future f = Future.future();
        this.vertx.deployVerticle(verticle, new DeploymentOptions(this.config().getJsonObject(name)), f.completer());
        return f;
    }

    private String getZkConnectionString() {
        String zkConnectString = System.getenv("ZK_CONNECT_STRING");
        if (StringUtils.isBlank((CharSequence)zkConnectString)) {
            zkConnectString = this.config().getString("zkConnectString");
        }
        return zkConnectString;
    }

    private boolean getBootstrapsFromZookeeper() {
        String value = System.getenv("BS_FROM_ZOOKEEPER");
        if (StringUtils.isBlank((CharSequence)value)) {
            value = this.config().getString("bsFromZookeeper");
        }
        if (StringUtils.isNotBlank((CharSequence)value)) {
            return Boolean.parseBoolean(value);
        }
        return false;
    }

    private Future<List<String>> getBootServers() {
        Future future = Future.future();
        ArrayList bServers = new ArrayList();
        if (this.getBootstrapsFromZookeeper()) {
            String zkConnectionString = this.getZkConnectionString();
            if (StringUtils.isNotBlank((CharSequence)zkConnectionString)) {
                LOG.info("To connect zookeeper at {}", (Object)zkConnectionString);
                CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().connectString(zkConnectionString).retryPolicy((RetryPolicy)new RetryForever(100)).build();
                zookeeperClient.start();
                String root = "/brokers/ids";
                PathChildrenCache cache = new PathChildrenCache(zookeeperClient, root, true);
                cache.getListenable().addListener((client, event) -> {
                    LOG.debug("Event: {}-size={}", (Object)event.getType(), (Object)cache.getCurrentData().size());
                    if (event.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                        LOG.info("Got kafka brokers, number={}", (Object)cache.getCurrentData().size());
                        cache.getCurrentData().forEach(data -> {
                            JsonObject obj = new JsonObject(new String(data.getData()));
                            bServers.add(obj.getString("host") + ":" + obj.getInteger("port"));
                        });
                        future.complete((Object)bServers);
                        cache.close();
                        zookeeperClient.close();
                    }
                });
                try {
                    cache.start();
                }
                catch (Exception e) {
                    LOG.error("Unable to start cache", (Throwable)e);
                    future.fail((Throwable)e);
                }
            } else {
                future.complete(bServers);
            }
        } else {
            future.complete(bServers);
        }
        return future;
    }
}

