package io.simplesource.saga.saga.internal;

import io.simplesource.api.CommandId;
import io.simplesource.kafka.internal.util.Tuple2;
import io.simplesource.saga.model.action.ActionCommand;
import io.simplesource.saga.model.action.ActionStatus;
import io.simplesource.saga.model.action.SagaAction;
import io.simplesource.saga.model.messages.ActionRequest;
import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.messages.SagaStateTransition;
import io.simplesource.saga.model.saga.Saga;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.saga.SagaStatus;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;

/* loaded from: input_file:io/simplesource/saga/saga/internal/ActionStream.class */
final class ActionStream {
    ActionStream() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> Tuple2<KStream<SagaId, SagaStateTransition<A>>, KStream<SagaId, ActionRequest<A>>> getNextActions(KStream<SagaId, Saga<A>> kStream) {
        KStream mapValues = kStream.mapValues((sagaId, saga) -> {
            return ActionResolver.getNextActions(saga);
        });
        return Tuple2.of(mapValues.filter((sagaId2, list) -> {
            return !list.isEmpty();
        }).mapValues((sagaId3, list2) -> {
            return SagaStateTransition.TransitionList.of((List) list2.stream().map(sagaActionExecution -> {
                return SagaStateTransition.SagaActionStateChanged.of(sagaId3, sagaActionExecution.actionId, sagaActionExecution.status, Collections.emptyList(), Optional.empty(), sagaActionExecution.isUndo.booleanValue());
            }).collect(Collectors.toList()));
        }).peek(SagaStream.logValues("stateUpdateNewActions")), mapValues.flatMapValues((sagaId4, list3) -> {
            return list3;
        }).filter((sagaId5, sagaActionExecution) -> {
            return sagaActionExecution.command.isPresent();
        }).mapValues((sagaId6, sagaActionExecution2) -> {
            return ActionRequest.of(sagaId6, sagaActionExecution2.actionId, (ActionCommand) sagaActionExecution2.command.get(), sagaActionExecution2.isUndo);
        }).peek(SagaStream.logValues("publishActionRequests")));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, SagaStateTransition<A>> handleActionResponses(SagaContext<A> sagaContext, KStream<SagaId, ActionResponse<A>> kStream, KTable<SagaId, Saga<A>> kTable) {
        KStream[] branch = kStream.branch(new Predicate[]{(sagaId, actionResponse) -> {
            return actionResponse.result.isSuccess();
        }, (sagaId2, actionResponse2) -> {
            return actionResponse2.result.isFailure();
        }});
        return branch[0].mapValues((sagaId3, actionResponse3) -> {
            return SagaStateTransition.SagaActionStateChanged.of(sagaId3, actionResponse3.actionId, actionResponse3.isUndo.booleanValue() ? ActionStatus.Undone : ActionStatus.Completed, Collections.emptyList(), (Optional) actionResponse3.result.getOrElse(Optional.empty()), actionResponse3.isUndo.booleanValue());
        }).merge(branch[1].join(kTable, (actionResponse4, saga) -> {
            if (saga.status == SagaStatus.FailurePending) {
                return Tuple2.of(Optional.empty(), actionResponse4);
            }
            SagaAction sagaAction = (SagaAction) saga.actions.get(actionResponse4.actionId);
            CommandId commandId = actionResponse4.commandId;
            return Tuple2.of(Optional.ofNullable(commandId.equals(sagaAction.command.commandId) ? sagaAction.command : (ActionCommand) sagaAction.undoCommand.filter(actionCommand -> {
                return actionCommand.commandId.equals(commandId);
            }).orElse(null)).flatMap(actionCommand2 -> {
                return sagaContext.retryStrategies.get(actionCommand2.actionType).nextRetry(sagaAction.retryCount);
            }), actionResponse4);
        }).mapValues(tuple2 -> {
            ActionResponse actionResponse5 = (ActionResponse) tuple2.v2();
            return SagaStateTransition.SagaActionStateChanged.of(actionResponse5.sagaId, actionResponse5.actionId, ((Optional) tuple2.v1()).isPresent() ? ActionStatus.RetryAwaiting : actionResponse5.isUndo.booleanValue() ? ActionStatus.UndoFailed : ActionStatus.Failed, (List) actionResponse5.result.failureReasons().get(), Optional.empty(), actionResponse5.isUndo.booleanValue());
        })).peek(SagaStream.logValues("stateTransitionsActionResponse"));
    }
}
