package io.simplesource.saga.saga;

import io.simplesource.saga.model.specs.ActionProcessorSpec;
import io.simplesource.saga.model.specs.SagaSpec;
import io.simplesource.saga.saga.app.SagaContext;
import io.simplesource.saga.saga.app.SagaStream;
import io.simplesource.saga.saga.app.SagaTopologyBuilder;
import io.simplesource.saga.shared.topics.TopicConfig;
import io.simplesource.saga.shared.topics.TopicConfigBuilder;
import io.simplesource.saga.shared.topics.TopicCreation;
import io.simplesource.saga.shared.topics.TopicTypes;
import io.simplesource.saga.shared.utils.StreamAppConfig;
import io.simplesource.saga.shared.utils.StreamAppUtils;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/saga/SagaApp.class */
public final class SagaApp<A> {
    private static Logger logger = LoggerFactory.getLogger(SagaApp.class);
    private final SagaSpec<A> sagaSpec;
    private final TopicConfig sagaTopicConfig;
    private final SagaTopologyBuilder<A> topologyBuilder;
    private final List<TopicCreation> topics;

    public SagaApp(SagaSpec<A> sagaSpec, TopicConfigBuilder.BuildSteps buildSteps) {
        this.sagaSpec = sagaSpec;
        this.sagaTopicConfig = TopicConfigBuilder.buildTopics(TopicTypes.SagaTopic.all, Collections.emptyMap(), Collections.singletonMap(TopicTypes.SagaTopic.state, Collections.singletonMap("cleanup.policy", "compact")), buildSteps);
        this.topologyBuilder = new SagaTopologyBuilder<>(sagaSpec, this.sagaTopicConfig);
        this.topics = TopicCreation.allTopics(this.sagaTopicConfig);
    }

    public SagaApp<A> addActionProcessor(ActionProcessorSpec<A> actionProcessorSpec, TopicConfigBuilder.BuildSteps buildSteps) {
        TopicConfig buildTopics = TopicConfigBuilder.buildTopics(TopicTypes.ActionTopic.all, Collections.emptyMap(), Collections.emptyMap(), buildSteps);
        this.topics.addAll(TopicCreation.allTopics(buildTopics));
        this.topologyBuilder.onBuildTopology(sagaTopologyContext -> {
            SagaStream.addSubTopology(sagaTopologyContext, new SagaContext(this.sagaSpec, actionProcessorSpec, this.sagaTopicConfig.namer, buildTopics.namer));
        });
        return this;
    }

    public void run(StreamAppConfig streamAppConfig) {
        Properties config = StreamAppConfig.getConfig(streamAppConfig);
        try {
            StreamAppUtils.createMissingTopics(AdminClient.create(config), this.topics).all().get(30L, TimeUnit.SECONDS);
            Topology buildTopology = buildTopology();
            logger.info("Topology description {}", buildTopology.describe());
            StreamAppUtils.runStreamApp(config, buildTopology);
        } catch (Exception e) {
            throw new RuntimeException("Unable to add missing topics", e);
        }
    }

    Topology buildTopology() {
        return this.topologyBuilder.build();
    }
}
