package io.simplesource.saga.saga.app;

import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.messages.SagaRequest;
import io.simplesource.saga.model.messages.SagaResponse;
import io.simplesource.saga.model.messages.SagaStateTransition;
import io.simplesource.saga.model.saga.Saga;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.specs.ActionProcessorSpec;
import io.simplesource.saga.model.specs.SagaSpec;
import io.simplesource.saga.shared.topics.TopicNamer;
import io.simplesource.saga.shared.topics.TopicTypes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:io/simplesource/saga/saga/app/SagaConsumer.class */
final class SagaConsumer {
    SagaConsumer() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, SagaRequest<A>> sagaRequest(SagaSpec<A> sagaSpec, TopicNamer topicNamer, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicNamer.apply(TopicTypes.SagaTopic.request), Consumed.with(sagaSpec.serdes.sagaId(), sagaSpec.serdes.request()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, SagaResponse> sagaResponse(SagaSpec<A> sagaSpec, TopicNamer topicNamer, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicNamer.apply(TopicTypes.SagaTopic.response), Consumed.with(sagaSpec.serdes.sagaId(), sagaSpec.serdes.response()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, SagaStateTransition> stateTransition(SagaSpec<A> sagaSpec, TopicNamer topicNamer, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicNamer.apply(TopicTypes.SagaTopic.stateTransition), Consumed.with(sagaSpec.serdes.sagaId(), sagaSpec.serdes.transition()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, Saga<A>> state(SagaSpec<A> sagaSpec, TopicNamer topicNamer, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicNamer.apply(TopicTypes.SagaTopic.state), Consumed.with(sagaSpec.serdes.sagaId(), sagaSpec.serdes.state()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, ActionResponse> actionResponse(ActionProcessorSpec<A> actionProcessorSpec, TopicNamer topicNamer, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicNamer.apply(TopicTypes.ActionTopic.response), Consumed.with(actionProcessorSpec.serdes.sagaId(), actionProcessorSpec.serdes.response()));
    }
}
