/*
 * Decompiled with CFR 0.152.
 */
package io.thill.kafkalite;

import io.thill.kafkalite.client.KafkaLiteClient;
import io.thill.kafkalite.exception.KafkaLiteException;
import io.thill.kafkalite.internal.KafkaLiteConfig;
import java.io.File;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;

public class KafkaLite {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLite.class);
    private static final long IDLE_MILLIS = 100L;
    private static boolean started = false;
    private static boolean cleanOnShutdown = false;
    private static TestingServer zookeeper;
    private static KafkaServerStartable kafka;

    public static void main(String ... args) throws KafkaLiteException {
        boolean clean = args.length > 0 && "true".equals(args[0]);
        LOGGER.info("Clean: {}", (Object)clean);
        if (clean) {
            KafkaLite.clean();
            KafkaLite.cleanOnShutdown();
        }
        KafkaLite.start();
        Signal.handle(new Signal("INT"), signal -> KafkaLite.stop());
    }

    public static synchronized void reset() throws KafkaLiteException {
        LOGGER.debug("reset()");
        KafkaLite.clean();
        KafkaLite.start();
    }

    public static synchronized void start() throws KafkaLiteException {
        LOGGER.debug("start()");
        if (!started) {
            if (!KafkaLiteConfig.KAFKA_DIR.isDirectory()) {
                LOGGER.info("Creating {}", (Object)KafkaLiteConfig.KAFKA_DIR.getAbsolutePath());
                KafkaLiteConfig.KAFKA_DIR.mkdirs();
            }
            LOGGER.info("Starting Zookeeper on port {}", (Object)KafkaLiteConfig.ZK_PORT);
            try {
                zookeeper = new TestingServer(new InstanceSpec(KafkaLiteConfig.ZOOKEEPER_DIR, KafkaLiteConfig.ZK_PORT, -1, -1, false, -1), true);
                zookeeper.start();
            }
            catch (Throwable t) {
                zookeeper = KafkaLite.close(zookeeper);
                throw new KafkaLiteException("Could not start Zookeeper", t);
            }
            LOGGER.info("Starting Kafka on port {}", (Object)KafkaLiteConfig.KB_PORT);
            try {
                kafka = new KafkaServerStartable(new KafkaConfig(KafkaLiteConfig.KAFKA_PROPERTIES));
                kafka.startup();
            }
            catch (Throwable t) {
                zookeeper = KafkaLite.close(zookeeper);
                kafka = KafkaLite.close(kafka);
                throw new KafkaLiteException("Could not start Kafka", t);
            }
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            try (KafkaLiteClient client = new KafkaLiteClient();){
                LOGGER.info("Existing topics: {}", client.kafkaConsumer().listTopics().keySet());
            }
            started = true;
        }
    }

    public static synchronized void stop() {
        LOGGER.debug("stop()");
        if (started) {
            LOGGER.info("Stopping Kafka");
            kafka = KafkaLite.close(kafka);
            LOGGER.info("Stopping Zookeeper");
            zookeeper = KafkaLite.close(zookeeper);
            started = false;
        }
    }

    public static synchronized void clean() {
        LOGGER.debug("clean()");
        if (started) {
            LOGGER.info("Deleting Topics");
            try (KafkaLiteClient client = new KafkaLiteClient();){
                for (String topic : client.kafkaConsumer().listTopics().keySet()) {
                    KafkaLite.deleteTopic(topic, client);
                }
            }
        } else {
            LOGGER.info("Deleting {}", (Object)KafkaLiteConfig.KAFKA_DIR.getAbsolutePath());
            KafkaLite.deleteRecursively(KafkaLiteConfig.KAFKA_DIR);
            LOGGER.info("Deleting {}", (Object)KafkaLiteConfig.ZOOKEEPER_DIR.getAbsolutePath());
            KafkaLite.deleteRecursively(KafkaLiteConfig.ZOOKEEPER_DIR);
            while (KafkaLiteConfig.KAFKA_DIR.exists() || KafkaLiteConfig.ZOOKEEPER_DIR.exists()) {
                KafkaLite.idle();
            }
        }
    }

    public static synchronized boolean createTopic(String topic, int partitions) {
        return KafkaLite.createTopic(topic, partitions, TimeUnit.HOURS.toMillis(1L));
    }

    public static synchronized boolean createTopic(String topic, int partitions, long retentionMillis) {
        LOGGER.debug("createTopic({}, {}, {})", new Object[]{topic, partitions, retentionMillis});
        if (started) {
            try (KafkaLiteClient client = new KafkaLiteClient();){
                List existing = (List)client.kafkaConsumer().listTopics().get(topic);
                if (existing != null) {
                    if (existing.size() == partitions) {
                        LOGGER.debug("Topic {} already exists", (Object)topic);
                        boolean bl = true;
                        return bl;
                    }
                    LOGGER.info("Topic {} already exists with different number of partitions. Deleting and recreating...", (Object)topic);
                    KafkaLite.deleteTopic(topic);
                }
                Properties props = new Properties();
                props.setProperty("retention.ms", Long.toString(retentionMillis));
                props.setProperty("preallocate", "false");
                client.adminZkClient().createTopic(topic, partitions, 1, props, (RackAwareMode)new RackAwareMode.Disabled$());
                while (!client.kafkaZkClient().topicExists(topic)) {
                    KafkaLite.idle();
                }
                while (!client.kafkaConsumer().listTopics().containsKey(topic)) {
                    KafkaLite.idle();
                }
            }
            return true;
        }
        return false;
    }

    public static synchronized boolean deleteTopic(String topic) {
        LOGGER.debug("deleteTopic({})", (Object)topic);
        if (started) {
            try (KafkaLiteClient client = new KafkaLiteClient();){
                KafkaLite.deleteTopic(topic, client);
                while (client.kafkaZkClient().topicExists(topic)) {
                    KafkaLite.idle();
                }
                while (client.kafkaConsumer().listTopics().containsKey(topic)) {
                    KafkaLite.idle();
                }
            }
            return true;
        }
        return false;
    }

    private static synchronized void deleteTopic(String topic, KafkaLiteClient client) {
        if (client.kafkaZkClient().topicExists(topic)) {
            LOGGER.info("Deleting Topic {}", (Object)topic);
            client.adminZkClient().deleteTopic(topic);
            while (client.kafkaConsumer().listTopics().containsKey(topic)) {
                KafkaLite.idle();
            }
        }
    }

    public static synchronized void cleanOnShutdown() {
        if (!cleanOnShutdown) {
            cleanOnShutdown = true;
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                KafkaLite.stop();
                KafkaLite.clean();
            }));
        }
    }

    private static void deleteRecursively(File f) {
        if (f.isDirectory()) {
            for (File child : f.listFiles()) {
                KafkaLite.deleteRecursively(child);
            }
        }
        f.delete();
    }

    private static void idle() {
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException e) {
            LOGGER.warn("Interrupted", (Throwable)e);
            throw new RuntimeException("Interrupted", e);
        }
    }

    private static TestingServer close(TestingServer zookeeper) {
        if (zookeeper != null) {
            try {
                zookeeper.close();
            }
            catch (Throwable t) {
                LOGGER.error("Could not close " + zookeeper.getClass().getName(), t);
            }
        }
        return null;
    }

    private static KafkaServerStartable close(KafkaServerStartable kafka) {
        if (kafka != null) {
            try {
                kafka.shutdown();
                kafka.awaitShutdown();
            }
            catch (Throwable t) {
                LOGGER.error("Could not shutdown " + kafka.getClass().getName(), t);
            }
        }
        return null;
    }

    public static Properties consumerProperties(Class<?> keyDerserializerType, Class<?> valueDeserializerType) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:" + KafkaLiteConfig.KB_PORT);
        props.setProperty("key.deserializer", keyDerserializerType.getName());
        props.setProperty("value.deserializer", valueDeserializerType.getName());
        return props;
    }

    public static Properties producerProperties(Class<?> keySerializerType, Class<?> valueSerializerType) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:" + KafkaLiteConfig.KB_PORT);
        props.setProperty("key.serializer", keySerializerType.getName());
        props.setProperty("value.serializer", valueSerializerType.getName());
        return props;
    }
}

