package org.radarbase.schema.registration;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.constraints.NotNull;
import kafka.cluster.Broker;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClientException;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.utils.Time;
import org.radarbase.schema.CommandLineApp;
import org.radarbase.schema.util.SubCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters$;
import scala.collection.Seq;

/* loaded from: input_file:org/radarbase/schema/registration/KafkaTopics.class */
public class KafkaTopics extends AbstractTopicRegistrar {
    private static final Logger logger = LoggerFactory.getLogger(KafkaTopics.class);
    private final KafkaZkClient zkClient;
    private AdminClient kafkaClient;
    private String bootstrapServers = null;
    private boolean initialized = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/radarbase/schema/registration/KafkaTopics$KafkaTopicsCommand.class */
    public static class KafkaTopicsCommand implements SubCommand {
        private KafkaTopicsCommand() {
        }

        @Override // org.radarbase.schema.util.SubCommand
        public String getName() {
            return "create";
        }

        @Override // org.radarbase.schema.util.SubCommand
        public int execute(Namespace namespace, CommandLineApp commandLineApp) {
            int intValue = namespace.getInt("brokers").intValue();
            short shortValue = namespace.getShort("replication").shortValue();
            if (intValue < shortValue) {
                KafkaTopics.logger.error("Cannot assign a replication factor {} higher than number of brokers {}", Short.valueOf(shortValue), Integer.valueOf(intValue));
                return 1;
            }
            int intValue2 = namespace.getInt("partitions").intValue();
            try {
                KafkaTopics kafkaTopics = new KafkaTopics(namespace.getString("zookeeper"));
                try {
                    if (kafkaTopics.initialize(intValue)) {
                        int createTopics = kafkaTopics.createTopics(commandLineApp.getCatalogue(), intValue2, shortValue, namespace.getString("topic"), namespace.getString("match"));
                        kafkaTopics.close();
                        return createTopics;
                    }
                    KafkaTopics.logger.error("Kafka brokers not yet available. Aborting.");
                    kafkaTopics.close();
                    return 1;
                } catch (Throwable th) {
                    try {
                        kafkaTopics.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (InterruptedException | ZooKeeperClientException e) {
                KafkaTopics.logger.error("Cannot retrieve number of addActive Kafka brokers. Please check that Zookeeper is running.");
                return 1;
            }
        }

        @Override // org.radarbase.schema.util.SubCommand
        public void addParser(ArgumentParser argumentParser) {
            argumentParser.description("Create all topics that are missing on the Kafka server.");
            argumentParser.addArgument(new String[]{"-p", "--partitions"}).help("number of partitions per topic").type(Integer.class).setDefault(3);
            argumentParser.addArgument(new String[]{"-r", "--replication"}).help("number of replicas per data packet").type(Short.class).setDefault((short) 3);
            argumentParser.addArgument(new String[]{"-b", "--brokers"}).help("number of brokers that are expected to be available.").type(Integer.class).setDefault(3);
            argumentParser.addArgument(new String[]{"-t", "--topic"}).help("register the schemas of one topic").type(String.class);
            argumentParser.addArgument(new String[]{"-m", "--match"}).help("register the schemas of all topics matching the given regex; does not do anything if --topic is specified").type(String.class);
            argumentParser.addArgument(new String[]{"zookeeper"}).help("zookeeper hosts and ports, comma-separated");
            SubCommand.addRootArgument(argumentParser);
        }
    }

    public KafkaTopics(@NotNull String str) {
        this.zkClient = KafkaZkClient.apply(str, false, 15000, 10000, 30, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.apply("radar-schemas"), Option.empty());
    }

    public boolean initialize(int i) throws InterruptedException {
        int i2 = 2;
        int i3 = 0;
        for (int i4 = 0; i4 < 20 && i3 < i; i4++) {
            List<Broker> currentBrokers = currentBrokers();
            i3 = currentBrokers.size();
            if (i3 >= i) {
                this.bootstrapServers = (String) currentBrokers.stream().map((v0) -> {
                    return v0.endPoints();
                }).flatMap(KafkaTopics::asStream).map((v0) -> {
                    return v0.connectionString();
                }).collect(Collectors.joining(","));
                logger.info("Creating Kafka client with bootstrap servers {}", this.bootstrapServers);
                this.kafkaClient = AdminClient.create(Map.of("bootstrap.servers", this.bootstrapServers));
            } else if (i4 < 20 - 1) {
                logger.warn("Only {} out of {} Kafka brokers available. Waiting {} seconds.", new Object[]{Integer.valueOf(i3), Integer.valueOf(i), Integer.valueOf(i2)});
                Thread.sleep(i2 * 1000);
                i2 = Math.min(32, i2 * 2);
            } else {
                logger.error("Only {} out of {} Kafka brokers available. Failed to wait on all brokers.", Integer.valueOf(i3), Integer.valueOf(i));
            }
        }
        this.initialized = i3 >= i;
        return this.initialized && refreshTopics();
    }

    @Override // org.radarbase.schema.registration.TopicRegistrar
    public void ensureInitialized() {
        if (!this.initialized) {
            throw new IllegalStateException("Manager is not initialized yet");
        }
    }

    private List<Broker> currentBrokers() {
        try {
            return (List) asStream(this.zkClient.getAllBrokersInCluster()).collect(Collectors.toList());
        } catch (ZooKeeperClientException e) {
            logger.warn("Failed to reach zookeeper");
            return List.of();
        }
    }

    private static <T> Stream<T> asStream(Seq<T> seq) {
        return JavaConverters$.MODULE$.seqAsJavaList(seq).stream();
    }

    public String getBootstrapServers() {
        ensureInitialized();
        return this.bootstrapServers;
    }

    public int getNumberOfBrokers() {
        return this.zkClient.getAllBrokersInCluster().length();
    }

    @NotNull
    public KafkaZkClient getZkClient() {
        return this.zkClient;
    }

    @Override // org.radarbase.schema.registration.AbstractTopicRegistrar
    @NotNull
    public AdminClient getKafkaClient() {
        ensureInitialized();
        return this.kafkaClient;
    }

    @Override // org.radarbase.schema.registration.AbstractTopicRegistrar, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.zkClient.close();
    }

    public static SubCommand command() {
        return new KafkaTopicsCommand();
    }
}
