package org.radarbase.schema.registration;

import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.constraints.NotNull;
import kafka.zk.ZkVersion;
import net.sourceforge.argparse4j.impl.action.StoreConstArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.zookeeper.KeeperException;
import org.radarbase.schema.CommandLineApp;
import org.radarbase.schema.util.SubCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/radarbase/schema/registration/SchemaTopicManager.class */
public class SchemaTopicManager implements Closeable {
    private static final String TOPIC_NAME = "_schemas";
    private final KafkaTopics topics;
    private final SchemaBackupStorage storage;
    private final SchemaRegistrySerializer serializer = new SchemaRegistrySerializer();
    private final ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
    private boolean isInitialized = false;
    private static final Logger logger = LoggerFactory.getLogger(SchemaTopicManager.class);
    private static final Duration SECONDARY_TIMEOUT = Duration.ofSeconds(10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/radarbase/schema/registration/SchemaTopicManager$SchemasCommand.class */
    public static class SchemasCommand implements SubCommand {
        private static final String SUBACTION = "subaction";

        /* loaded from: input_file:org/radarbase/schema/registration/SchemaTopicManager$SchemasCommand$SubAction.class */
        private enum SubAction {
            BACKUP,
            RESTORE,
            ENSURE
        }

        private SchemasCommand() {
        }

        @Override // org.radarbase.schema.util.SubCommand
        public String getName() {
            return "schema-topic";
        }

        @Override // org.radarbase.schema.util.SubCommand
        public int execute(Namespace namespace, CommandLineApp commandLineApp) {
            try {
                SchemaTopicManager schemaTopicManager = new SchemaTopicManager(namespace.getString("zookeeper"), new JsonSchemaBackupStorage(Paths.get(namespace.getString("file"), new String[0])));
                try {
                    schemaTopicManager.initialize(namespace.getInt("brokers").intValue());
                    Duration ofSeconds = Duration.ofSeconds(namespace.getInt("timeout").intValue());
                    switch ((SubAction) namespace.get(SUBACTION)) {
                        case BACKUP:
                            schemaTopicManager.makeBackup(ofSeconds);
                            break;
                        case RESTORE:
                            schemaTopicManager.restoreBackup(namespace.getShort("replication").shortValue());
                            break;
                        case ENSURE:
                            schemaTopicManager.ensure(namespace.getShort("replication").shortValue(), ofSeconds);
                            break;
                        default:
                            SchemaTopicManager.logger.error("Unknown action");
                            schemaTopicManager.close();
                            return 3;
                    }
                    schemaTopicManager.close();
                    return 0;
                } finally {
                }
            } catch (Exception e) {
                SchemaTopicManager.logger.error("Action failed: {}", e.toString());
                return 2;
            }
        }

        @Override // org.radarbase.schema.util.SubCommand
        public void addParser(ArgumentParser argumentParser) {
            argumentParser.description("Manage the _schemas topic");
            argumentParser.addArgument(new String[]{"--backup"}).help("back up schema topic data").action(new StoreConstArgumentAction()).setConst(SubAction.BACKUP).dest(SUBACTION);
            argumentParser.addArgument(new String[]{"--restore"}).help("restore schema topic from backup").action(new StoreConstArgumentAction()).setConst(SubAction.RESTORE).dest(SUBACTION);
            argumentParser.addArgument(new String[]{"--ensure"}).help("ensure that the schema topic is restored if needed").action(new StoreConstArgumentAction()).setConst(SubAction.ENSURE).dest(SUBACTION);
            argumentParser.addArgument(new String[]{"-r", "--replication"}).help("number of replicas per data packet").type(Short.class).setDefault((short) 3);
            argumentParser.addArgument(new String[]{"-t", "--timeout"}).help("time (seconds) to wait for records in the _schemas topic to become available").setDefault(600).type(Integer.class);
            argumentParser.addArgument(new String[]{"-f", "--file"}).help("JSON file to load _schemas from").type(String.class).required(true);
            argumentParser.addArgument(new String[]{"-b", "--brokers"}).help("number of brokers that are expected to be available.").type(Integer.class).setDefault(3);
            argumentParser.addArgument(new String[]{"zookeeper"}).help("zookeeper hosts and ports, comma-separated");
            SubCommand.addRootArgument(argumentParser);
        }
    }

    public SchemaTopicManager(@NotNull String str, @NotNull SchemaBackupStorage schemaBackupStorage) {
        this.topics = new KafkaTopics(str);
        this.storage = schemaBackupStorage;
    }

    public void initialize(int i) throws InterruptedException {
        if (!this.topics.initialize(i)) {
            throw new IllegalStateException("Brokers or topics not available.");
        }
        this.isInitialized = true;
    }

    private void ensureInitialized() {
        if (!this.isInitialized) {
            throw new IllegalStateException("Manager is not initialized yet");
        }
    }

    @NotNull
    public SchemaTopicBackup readBackup(Duration duration) throws SerializationException, ExecutionException, InterruptedException {
        ensureInitialized();
        SchemaTopicBackup schemaTopicBackup = new SchemaTopicBackup();
        try {
            readSchemas(getConsumerProps(), schemaTopicBackup, duration);
            schemaTopicBackup.setConfig((Config) ((KafkaFuture) this.topics.getKafkaClient().describeConfigs(List.of(this.topicResource)).values().get(this.topicResource)).get());
            return schemaTopicBackup;
        } catch (SerializationException e) {
            logger.error("Failed to deserialize the schema or config key", e);
            throw e;
        } catch (InterruptedException e2) {
            logger.error("Failed waiting for _schemas records", e2);
            Thread.currentThread().interrupt();
            throw e2;
        } catch (RuntimeException e3) {
            logger.error("Failed to store schemas", e3);
            throw e3;
        } catch (ExecutionException e4) {
            logger.error("Failed to get _schemas config", e4);
            throw e4;
        }
    }

    public void makeBackup(Duration duration) throws IOException, InterruptedException, ExecutionException, SerializationException {
        SchemaTopicBackup readBackup = readBackup(duration);
        if (readBackup != null) {
            try {
                if (readBackup.startsAtFirstId()) {
                    this.storage.store(readBackup);
                } else {
                    this.storage.storeInvalid(readBackup);
                }
            } catch (IOException e) {
                logger.error("Failed to store _schemas data", e);
                throw e;
            }
        }
    }

    @NotNull
    private Map<String, Object> getConsumerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("group.id", "schema-backup-" + UUID.randomUUID().toString());
        hashMap.put("client.id", "schema-backup");
        hashMap.put("bootstrap.servers", this.topics.getBootstrapServers());
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("enable.auto.commit", "false");
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("value.deserializer", ByteArrayDeserializer.class);
        return hashMap;
    }

    @NotNull
    private Map<String, Object> getProducerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", "schema-backup");
        hashMap.put("bootstrap.servers", this.topics.getBootstrapServers());
        hashMap.put("key.serializer", ByteArraySerializer.class);
        hashMap.put("value.serializer", ByteArraySerializer.class);
        return hashMap;
    }

    public void ensure(short s, Duration duration) throws InterruptedException, ExecutionException, SerializationException, IOException, KeeperException {
        ensureInitialized();
        ensureSchemaRegistryNotRunning();
        boolean contains = this.topics.getTopics().contains(TOPIC_NAME);
        if (contains) {
            SchemaTopicBackup readBackup = readBackup(duration);
            if (readBackup.startsAtFirstId()) {
                logger.info("Existing topic is valid.");
                return;
            }
            try {
                this.storage.storeInvalid(readBackup);
            } catch (IOException e) {
                logger.error("Backup storage failure.", e);
                throw e;
            }
        }
        try {
            SchemaTopicBackup load = this.storage.load();
            if (load == null) {
                logger.error("No valid backup in storage.");
                return;
            }
            if (contains) {
                this.topics.getKafkaClient().deleteTopics(List.of(TOPIC_NAME)).all().get();
                try {
                    this.topics.refreshTopics();
                } catch (InterruptedException e2) {
                    logger.info("Failed to wait to refresh topics");
                    Thread.currentThread().interrupt();
                    throw e2;
                }
            }
            if (!this.topics.createTopics(Stream.of(TOPIC_NAME), 1, s)) {
                throw new IllegalStateException("Failed to create _schemas topic");
            }
            commitBackup(load);
        } catch (IOException e3) {
            logger.error("Backup storage loading failure.", e3);
            throw e3;
        }
    }

    private void readSchemas(Map<String, Object> map, SchemaTopicBackup schemaTopicBackup, Duration duration) throws SerializationException {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(map);
        try {
            ensurePartitions(kafkaConsumer);
            TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
            kafkaConsumer.assign(List.of(topicPartition));
            kafkaConsumer.seekToBeginning(List.of(topicPartition));
            logger.debug("Kafka store reader thread started");
            int i = -1;
            Duration duration2 = duration;
            while (i != 0) {
                ConsumerRecords poll = kafkaConsumer.poll(duration2);
                duration2 = SECONDARY_TIMEOUT;
                i = poll.count();
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    schemaTopicBackup.addSchemaRecord(this.serializer, (ConsumerRecord) it.next());
                }
            }
            kafkaConsumer.close();
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void ensurePartitions(Consumer<?, ?> consumer) {
        List partitionsFor;
        int i = 0;
        do {
            partitionsFor = consumer.partitionsFor(TOPIC_NAME);
            if (partitionsFor != null && !partitionsFor.isEmpty()) {
                break;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            i++;
        } while (i < 10);
        if (partitionsFor == null || partitionsFor.isEmpty()) {
            throw new IllegalArgumentException("Unable to subscribe to the Kafka topic _schemas backing this data store. Topic may not exist.");
        }
        if (partitionsFor.size() > 1) {
            throw new IllegalStateException("Unexpected number of partitions in the _schemas topic. Expected 1 and instead got " + partitionsFor.size());
        }
    }

    public void restoreBackup(short s) throws IOException, ExecutionException, InterruptedException, KeeperException {
        ensureInitialized();
        ensureSchemaRegistryNotRunning();
        try {
            SchemaTopicBackup load = this.storage.load();
            if (load == null) {
                logger.error("Backup not available");
            } else {
                if (this.topics.getTopics().contains(TOPIC_NAME)) {
                    throw new IllegalStateException("Topic _schemas already exists, cannot restore it from backup");
                }
                if (!this.topics.createTopics(Stream.of(TOPIC_NAME), 1, s)) {
                    throw new IllegalStateException("Failed to create _schemas topic");
                }
                commitBackup(load);
            }
        } catch (IOException e) {
            logger.error("Failed to load _schemas data", e);
            throw e;
        }
    }

    private void ensureSchemaRegistryNotRunning() throws KeeperException, InterruptedException {
        try {
        } catch (Exception e) {
            logger.error("Cannot check whether schema registry master exists", e);
        }
        if (this.topics.getZkClient().pathExists("/schema_registry/schema_registry_master")) {
            throw new IllegalStateException("Cannot restore schemas while the schema registry is running.");
        }
        logger.info("No zookeeper nodes for Schema Registry.");
    }

    private void resetSchemaRegistryId() throws KeeperException, InterruptedException {
        try {
            this.topics.getZkClient().deletePath("/schema_registry/schema_registry_id", ZkVersion.MatchAnyVersion(), false);
        } catch (Exception e) {
            logger.info("No schema registry ID listed in zookeeper.");
        }
    }

    private void commitBackup(SchemaTopicBackup schemaTopicBackup) throws ExecutionException, InterruptedException, KeeperException {
        AlterConfigsResult incrementalAlterConfigs = this.topics.getKafkaClient().incrementalAlterConfigs(Map.of(this.topicResource, (Collection) schemaTopicBackup.getConfig().entries().stream().map(configEntry -> {
            return new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
        }).collect(Collectors.toList())));
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerProps());
        try {
            Stream<R> map = schemaTopicBackup.getRecords().stream().map(schemaRecord -> {
                return new ProducerRecord(TOPIC_NAME, schemaRecord.getKey(), schemaRecord.getValue());
            });
            Objects.requireNonNull(kafkaProducer);
            Iterator it = ((List) map.map(kafkaProducer::send).collect(Collectors.toList())).iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            kafkaProducer.close();
            incrementalAlterConfigs.all().get();
            resetSchemaRegistryId();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public static SubCommand command() {
        return new SchemasCommand();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.topics.close();
    }
}
