/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kafka;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.CancelledKeyException;
import java.util.Map;
import java.util.Properties;
import kafka.admin.TopicCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOperatorTestBase {
    public static final String END_TUPLE = "END_TUPLE";
    public static final int[] TEST_ZOOKEEPER_PORT;
    public static final int[][] TEST_KAFKA_BROKER_PORT;
    public static final String TEST_TOPIC = "testtopic";
    static final Logger logger;
    private KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
    private ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
    private ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
    public String baseDir = "target";
    private final String zkBaseDir = "zookeeper-server-data";
    private final String kafkaBaseDir = "kafka-server-data";
    private final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
    private final String[][] kafkadir = new String[][]{{"kafka-server-data/1/1", "kafka-server-data/1/2"}, {"kafka-server-data/2/1", "kafka-server-data/2/2"}};
    protected boolean hasMultiPartition = false;
    protected boolean hasMultiCluster = false;

    public void startZookeeper(int clusterId) {
        try {
            int numConnections = 10;
            int tickTime = 2000;
            File dir = new File(this.baseDir, this.zkdir[clusterId]);
            this.zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
            this.zkFactory[clusterId] = new NIOServerCnxnFactory();
            this.zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
            this.zkFactory[clusterId].startup(this.zkServer[clusterId]);
            Thread.sleep(2000L);
        }
        catch (Exception ex) {
            logger.error(ex.getLocalizedMessage());
        }
    }

    public void stopZookeeper() {
        for (ZooKeeperServer zooKeeperServer : this.zkServer) {
            if (zooKeeperServer == null) continue;
            zooKeeperServer.shutdown();
        }
        for (ZooKeeperServer zooKeeperServer : this.zkFactory) {
            if (zooKeeperServer == null) continue;
            zooKeeperServer.closeAll();
            zooKeeperServer.shutdown();
        }
        this.zkServer = new ZooKeeperServer[2];
        this.zkFactory = new ServerCnxnFactory[2];
    }

    public void startKafkaServer(int clusterid, int brokerid) {
        Properties props = new Properties();
        props.setProperty("broker.id", "" + brokerid);
        props.setProperty("log.dirs", new File(this.baseDir, this.kafkadir[clusterid][brokerid]).toString());
        props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
        props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
        props.setProperty("default.replication.factor", "1");
        props.setProperty("log.flush.interval.messages", "50000");
        if (this.hasMultiPartition) {
            props.setProperty("num.partitions", "2");
        } else {
            props.setProperty("num.partitions", "1");
        }
        this.broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig((Map)props));
        this.broker[clusterid][brokerid].startup();
    }

    public void startKafkaServer() {
        FileUtils.deleteQuietly((File)new File(this.baseDir, "kafka-server-data"));
        boolean[][] startable = new boolean[][]{{true, this.hasMultiPartition}, {this.hasMultiCluster, this.hasMultiCluster && this.hasMultiPartition}};
        for (int i = 0; i < startable.length; ++i) {
            for (int j = 0; j < startable[i].length; ++j) {
                if (!startable[i][j]) continue;
                this.startKafkaServer(i, j);
            }
        }
    }

    public void stopKafkaServer() {
        for (int i = 0; i < this.broker.length; ++i) {
            for (int j = 0; j < this.broker[i].length; ++j) {
                if (this.broker[i][j] == null) continue;
                this.broker[i][j].shutdown();
                this.broker[i][j].awaitShutdown();
                this.broker[i][j] = null;
            }
        }
    }

    @Before
    public void beforeTest() {
        try {
            this.startZookeeper();
            this.startKafkaServer();
            this.createTopic(0, TEST_TOPIC);
            if (this.hasMultiCluster) {
                this.createTopic(1, TEST_TOPIC);
            }
        }
        catch (CancelledKeyException ex) {
            logger.debug("LSHIL {}", (Object)ex.getLocalizedMessage());
        }
    }

    public void startZookeeper() {
        FileUtils.deleteQuietly((File)new File(this.baseDir, "zookeeper-server-data"));
        this.startZookeeper(0);
        if (this.hasMultiCluster) {
            this.startZookeeper(1);
        }
    }

    public void createTopic(int clusterid, String topicName) {
        String[] args = new String[]{"--zookeeper", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid], "--replication-factor", "1", "--partitions", this.hasMultiPartition ? "2" : "1", "--topic", topicName, "--create"};
        ZkUtils zu = ZkUtils.apply((String)("localhost:" + TEST_ZOOKEEPER_PORT[clusterid]), (int)30000, (int)30000, (boolean)false);
        TopicCommand.createTopic((ZkUtils)zu, (TopicCommand.TopicCommandOptions)new TopicCommand.TopicCommandOptions(args));
        try {
            Thread.sleep(5000L);
            zu.close();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @After
    public void afterTest() {
        try {
            this.stopKafkaServer();
            this.stopZookeeper();
        }
        catch (Exception ex) {
            logger.debug("LSHIL {}", (Object)ex.getLocalizedMessage());
        }
    }

    public void setHasMultiPartition(boolean hasMultiPartition) {
        this.hasMultiPartition = hasMultiPartition;
    }

    public void setHasMultiCluster(boolean hasMultiCluster) {
        this.hasMultiCluster = hasMultiCluster;
    }

    static {
        ServerSocket[] listeners = new ServerSocket[6];
        int[] p = new int[6];
        try {
            int i;
            for (i = 0; i < 6; ++i) {
                listeners[i] = new ServerSocket(0);
                p[i] = listeners[i].getLocalPort();
            }
            for (i = 0; i < 6; ++i) {
                listeners[i].close();
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
        TEST_KAFKA_BROKER_PORT = new int[][]{{p[2], p[3]}, {p[4], p[5]}};
        logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
    }

    public static class TestZookeeperServer
    extends ZooKeeperServer {
        public TestZookeeperServer() {
        }

        public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
            super(snapDir, logDir, tickTime);
        }

        public TestZookeeperServer(FileTxnSnapLog txnLogFactory, ZooKeeperServer.DataTreeBuilder treeBuilder) throws IOException {
            super(txnLogFactory, treeBuilder);
        }

        public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, ZooKeeperServer.DataTreeBuilder treeBuilder) throws IOException {
            super(txnLogFactory, tickTime, treeBuilder);
        }

        public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZooKeeperServer.DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
            super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
        }

        protected void registerJMX() {
        }

        protected void unregisterJMX() {
        }
    }
}

