package io.simplesource.saga.saga.internal;

import io.simplesource.kafka.internal.util.Tuple2;
import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.messages.SagaRequest;
import io.simplesource.saga.model.messages.SagaStateTransition;
import io.simplesource.saga.model.saga.Saga;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.saga.internal.SagaTopologyBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/simplesource/saga/saga/internal/SagaStream.class */
public final class SagaStream {
    private static Logger logger = LoggerFactory.getLogger(SagaStream.class);

    SagaStream() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V> ForeachAction<K, V> logValues(String str) {
        return (obj, obj2) -> {
            logger.info("{}: {}={}", new Object[]{str, obj.toString().substring(0, 6), obj2.toString()});
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> void addSubTopology(SagaContext<A> sagaContext, SagaTopologyBuilder.DelayedRetryPublisher<A> delayedRetryPublisher, KStream<SagaId, SagaRequest<A>> kStream, KStream<SagaId, SagaStateTransition<A>> kStream2, KStream<SagaId, Saga<A>> kStream3, KStream<SagaId, ActionResponse<A>> kStream4) {
        KTable createStateTable = createStateTable(sagaContext, kStream3);
        Tuple2 validateSagaRequests = SetupStream.validateSagaRequests(sagaContext, kStream);
        KStream addInitialState = SetupStream.addInitialState(sagaContext, (KStream) validateSagaRequests.v1(), createStateTable);
        Tuple2 nextActions = ActionStream.getNextActions(kStream3);
        KStream handleActionResponses = ActionStream.handleActionResponses(sagaContext, kStream4, createStateTable);
        KStream applyStateTransitions = TransitionStream.applyStateTransitions(sagaContext, delayedRetryPublisher, kStream2);
        Tuple2 addSagaResponse = ResponseStream.addSagaResponse(kStream3);
        SagaProducer.publishActionRequests(sagaContext, (KStream) nextActions.v2());
        SagaProducer.publishSagaStateTransitions(sagaContext, addInitialState);
        SagaProducer.publishSagaStateTransitions(sagaContext, (KStream) nextActions.v1());
        SagaProducer.publishSagaStateTransitions(sagaContext, handleActionResponses);
        SagaProducer.publishSagaStateTransitions(sagaContext, (KStream) addSagaResponse.v1());
        SagaProducer.publishSagaState(sagaContext, applyStateTransitions);
        SagaProducer.publishSagaResponses(sagaContext, (KStream) validateSagaRequests.v2());
        SagaProducer.publishSagaResponses(sagaContext, (KStream) addSagaResponse.v2());
    }

    private static <A> KTable<SagaId, Saga<A>> createStateTable(SagaContext<A> sagaContext, KStream<SagaId, Saga<A>> kStream) {
        return kStream.groupByKey(Grouped.with(sagaContext.sSerdes.sagaId(), sagaContext.sSerdes.state())).reduce((saga, saga2) -> {
            return saga.sequence.getSeq() > saga2.sequence.getSeq() ? saga : saga2;
        }, Materialized.with(sagaContext.sSerdes.sagaId(), sagaContext.sSerdes.state()));
    }
}
