package io.simplesource.saga.saga.app;

import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Result;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.internal.util.Tuple2;
import io.simplesource.saga.model.action.ActionCommand;
import io.simplesource.saga.model.action.ActionStatus;
import io.simplesource.saga.model.messages.ActionRequest;
import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.messages.SagaRequest;
import io.simplesource.saga.model.messages.SagaResponse;
import io.simplesource.saga.model.messages.SagaStateTransition;
import io.simplesource.saga.model.saga.Saga;
import io.simplesource.saga.model.saga.SagaError;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.saga.SagaStatus;
import io.simplesource.saga.model.serdes.SagaSerdes;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/saga/app/SagaStream.class */
public final class SagaStream {
    private static Logger logger = LoggerFactory.getLogger(SagaStream.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/simplesource/saga/saga/app/SagaStream$StatusWithError.class */
    public static final class StatusWithError {
        private final Sequence sequence;
        private final SagaStatus status;
        private final Optional<NonEmptyList<SagaError>> errors;

        static Optional<StatusWithError> of(Sequence sequence, SagaStatus sagaStatus) {
            return Optional.of(new StatusWithError(sequence, sagaStatus, Optional.empty()));
        }

        static Optional<StatusWithError> of(Sequence sequence, List<SagaError> list) {
            return Optional.of(new StatusWithError(sequence, SagaStatus.Failed, NonEmptyList.fromList(list)));
        }

        public StatusWithError(Sequence sequence, SagaStatus sagaStatus, Optional<NonEmptyList<SagaError>> optional) {
            this.sequence = sequence;
            this.status = sagaStatus;
            this.errors = optional;
        }

        public Sequence sequence() {
            return this.sequence;
        }

        public SagaStatus status() {
            return this.status;
        }

        public Optional<NonEmptyList<SagaError>> errors() {
            return this.errors;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof StatusWithError)) {
                return false;
            }
            StatusWithError statusWithError = (StatusWithError) obj;
            Sequence sequence = sequence();
            Sequence sequence2 = statusWithError.sequence();
            if (sequence == null) {
                if (sequence2 != null) {
                    return false;
                }
            } else if (!sequence.equals(sequence2)) {
                return false;
            }
            SagaStatus status = status();
            SagaStatus status2 = statusWithError.status();
            if (status == null) {
                if (status2 != null) {
                    return false;
                }
            } else if (!status.equals(status2)) {
                return false;
            }
            Optional<NonEmptyList<SagaError>> errors = errors();
            Optional<NonEmptyList<SagaError>> errors2 = statusWithError.errors();
            return errors == null ? errors2 == null : errors.equals(errors2);
        }

        public int hashCode() {
            Sequence sequence = sequence();
            int hashCode = (1 * 59) + (sequence == null ? 43 : sequence.hashCode());
            SagaStatus status = status();
            int hashCode2 = (hashCode * 59) + (status == null ? 43 : status.hashCode());
            Optional<NonEmptyList<SagaError>> errors = errors();
            return (hashCode2 * 59) + (errors == null ? 43 : errors.hashCode());
        }

        public String toString() {
            return "SagaStream.StatusWithError(sequence=" + sequence() + ", status=" + status() + ", errors=" + errors() + ")";
        }
    }

    static <K, V> ForeachAction<K, V> logValues(String str) {
        return (obj, obj2) -> {
            logger.info("{}: {}={}", new Object[]{str, obj.toString().substring(0, 6), obj2.toString()});
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> void addSubTopology(SagaContext<A> sagaContext, KStream<SagaId, SagaRequest<A>> kStream, KStream<SagaId, SagaStateTransition> kStream2, KStream<SagaId, Saga<A>> kStream3, KStream<SagaId, ActionResponse> kStream4) {
        KTable createStateTable = createStateTable(sagaContext, kStream3);
        Tuple2 validateSagaRequests = validateSagaRequests(sagaContext, kStream);
        KStream<SagaId, SagaStateTransition> addInitialState = addInitialState(sagaContext, (KStream) validateSagaRequests.v1(), createStateTable);
        Tuple2 addNextActions = addNextActions(kStream3);
        KStream<SagaId, SagaStateTransition> addActionResponses = addActionResponses(kStream4);
        Tuple2<KStream<SagaId, SagaStateTransition>, KStream<SagaId, SagaResponse>> addSagaResponse = addSagaResponse(kStream3);
        KStream applyStateTransitions = applyStateTransitions(sagaContext, kStream2);
        SagaProducer.publishActionRequests(sagaContext, (KStream) addNextActions.v2());
        SagaProducer.publishSagaStateTransitions(sagaContext, addInitialState);
        SagaProducer.publishSagaStateTransitions(sagaContext, (KStream) addNextActions.v1());
        SagaProducer.publishSagaStateTransitions(sagaContext, addActionResponses);
        SagaProducer.publishSagaStateTransitions(sagaContext, (KStream) addSagaResponse.v1());
        SagaProducer.publishSagaState(sagaContext, applyStateTransitions);
        SagaProducer.publishSagaResponses(sagaContext, (KStream) validateSagaRequests.v2());
        SagaProducer.publishSagaResponses(sagaContext, (KStream) addSagaResponse.v2());
    }

    static <A> KTable<SagaId, Saga<A>> createStateTable(SagaContext<A> sagaContext, KStream<SagaId, Saga<A>> kStream) {
        return kStream.groupByKey(Grouped.with(sagaContext.sSerdes.sagaId(), sagaContext.sSerdes.state())).reduce((saga, saga2) -> {
            return saga.sequence.getSeq() > saga2.sequence.getSeq() ? saga : saga2;
        }, Materialized.with(sagaContext.sSerdes.sagaId(), sagaContext.sSerdes.state()));
    }

    static <A> KStream<SagaId, Saga<A>> applyStateTransitions(SagaContext<A> sagaContext, KStream<SagaId, SagaStateTransition> kStream) {
        SagaSerdes<A> sagaSerdes = sagaContext.sSerdes;
        return kStream.groupByKey(Grouped.with(sagaSerdes.sagaId(), sagaSerdes.transition())).aggregate(() -> {
            return Saga.of(SagaId.random(), new HashMap(), SagaStatus.NotStarted, Sequence.first());
        }, (sagaId, sagaStateTransition, saga) -> {
            return SagaUtils.applyTransition(sagaStateTransition, saga);
        }, Materialized.with(sagaSerdes.sagaId(), sagaSerdes.state())).toStream();
    }

    private static <A> Tuple2<KStream<SagaId, SagaRequest<A>>, KStream<SagaId, SagaResponse>> validateSagaRequests(SagaContext<A> sagaContext, KStream<SagaId, SagaRequest<A>> kStream) {
        Set<String> keySet = sagaContext.actionTopicNamers.keySet();
        KStream[] branch = kStream.mapValues((sagaId, sagaRequest) -> {
            return Tuple2.of(sagaRequest, (List) sagaRequest.initialState.actions.values().stream().map(sagaAction -> {
                String lowerCase = sagaAction.actionType.toLowerCase();
                return !keySet.contains(lowerCase) ? SagaError.of(SagaError.Reason.InvalidSaga, String.format("Unknown action type '%s'", lowerCase)) : null;
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList()));
        }).branch(new Predicate[]{(sagaId2, tuple2) -> {
            return ((List) tuple2.v2()).isEmpty();
        }, (sagaId3, tuple22) -> {
            return !((List) tuple22.v2()).isEmpty();
        }});
        return Tuple2.of(branch[0].mapValues((v0) -> {
            return v0.v1();
        }), branch[1].mapValues((v0) -> {
            return v0.v2();
        }).mapValues((sagaId4, list) -> {
            return new SagaResponse(sagaId4, Result.failure((NonEmptyList) NonEmptyList.fromList(list).get()));
        }));
    }

    private static <A> KStream<SagaId, SagaStateTransition> addInitialState(SagaContext<A> sagaContext, KStream<SagaId, SagaRequest<A>> kStream, KTable<SagaId, Saga<A>> kTable) {
        SagaSerdes<A> sagaSerdes = sagaContext.sSerdes;
        return kStream.leftJoin(kTable, (sagaRequest, saga) -> {
            return Tuple2.of(sagaRequest, Boolean.valueOf(saga == null));
        }, Joined.with(sagaSerdes.sagaId(), sagaSerdes.request(), sagaSerdes.state())).filter((sagaId, tuple2) -> {
            return ((Boolean) tuple2.v2()).booleanValue();
        }).mapValues((sagaId2, tuple22) -> {
            return new SagaStateTransition.SetInitialState(((SagaRequest) tuple22.v1()).initialState);
        });
    }

    static <A> Tuple2<KStream<SagaId, SagaStateTransition>, KStream<SagaId, SagaResponse>> addSagaResponse(KStream<SagaId, Saga<A>> kStream) {
        KStream mapValues = kStream.mapValues((sagaId, saga) -> {
            if (saga.status == SagaStatus.InProgress && SagaUtils.sagaCompleted(saga)) {
                return StatusWithError.of(saga.sequence, SagaStatus.Completed);
            }
            if (saga.status == SagaStatus.InProgress && SagaUtils.sagaFailurePending(saga)) {
                return StatusWithError.of(saga.sequence, SagaStatus.FailurePending);
            }
            if ((saga.status == SagaStatus.InProgress || saga.status == SagaStatus.FailurePending) && SagaUtils.sagaInFailure(saga)) {
                return StatusWithError.of(saga.sequence, SagaStatus.InFailure);
            }
            if ((saga.status != SagaStatus.InFailure && saga.status != SagaStatus.InProgress) || !SagaUtils.sagaFailed(saga)) {
                return Optional.empty();
            }
            return StatusWithError.of(saga.sequence, (List<SagaError>) saga.actions.values().stream().filter(sagaAction -> {
                return sagaAction.status == ActionStatus.Failed && !sagaAction.error.isEmpty();
            }).flatMap(sagaAction2 -> {
                return sagaAction2.error.stream();
            }).collect(Collectors.toList()));
        }).filter((sagaId2, optional) -> {
            return optional.isPresent();
        }).mapValues((sagaId3, optional2) -> {
            return (StatusWithError) optional2.get();
        });
        return Tuple2.of(mapValues.mapValues((sagaId4, statusWithError) -> {
            return new SagaStateTransition.SagaStatusChanged(sagaId4, statusWithError.status, (List) statusWithError.errors.map((v0) -> {
                return v0.toList();
            }).orElse(Collections.emptyList()));
        }), mapValues.mapValues((sagaId5, statusWithError2) -> {
            SagaStatus sagaStatus = statusWithError2.status;
            return sagaStatus == SagaStatus.Completed ? Optional.of(Result.success(statusWithError2.sequence)) : sagaStatus == SagaStatus.Failed ? Optional.of(Result.failure((NonEmptyList) statusWithError2.errors.get())) : Optional.empty();
        }).filter((sagaId6, optional3) -> {
            return optional3.isPresent();
        }).mapValues((sagaId7, optional4) -> {
            return new SagaResponse(sagaId7, (Result) optional4.get());
        }));
    }

    private static <A> Tuple2<KStream<SagaId, SagaStateTransition>, KStream<SagaId, ActionRequest<A>>> addNextActions(KStream<SagaId, Saga<A>> kStream) {
        KStream mapValues = kStream.mapValues((sagaId, saga) -> {
            return SagaUtils.getNextActions(saga);
        });
        return Tuple2.of(mapValues.filter((sagaId2, list) -> {
            return !list.isEmpty();
        }).mapValues((sagaId3, list2) -> {
            return new SagaStateTransition.TransitionList((List) list2.stream().map(sagaActionExecution -> {
                return new SagaStateTransition.SagaActionStatusChanged(sagaId3, sagaActionExecution.actionId, sagaActionExecution.status, Collections.emptyList());
            }).collect(Collectors.toList()));
        }).peek(logValues("stateUpdateNewActions")), mapValues.flatMapValues((sagaId4, list3) -> {
            return list3;
        }).filter((sagaId5, sagaActionExecution) -> {
            return sagaActionExecution.command.isPresent();
        }).mapValues((sagaId6, sagaActionExecution2) -> {
            return ActionRequest.builder().sagaId(sagaId6).actionId(sagaActionExecution2.actionId).actionCommand((ActionCommand) sagaActionExecution2.command.get()).actionType(sagaActionExecution2.actionType).build();
        }).peek(logValues("publishActionRequests")));
    }

    private static KStream<SagaId, SagaStateTransition> addActionResponses(KStream<SagaId, ActionResponse> kStream) {
        return kStream.mapValues((sagaId, actionResponse) -> {
            Tuple2 tuple2 = (Tuple2) actionResponse.result.fold(nonEmptyList -> {
                return Tuple2.of(ActionStatus.Failed, nonEmptyList.toList());
            }, bool -> {
                return Tuple2.of(ActionStatus.Completed, Collections.emptyList());
            });
            return new SagaStateTransition.SagaActionStatusChanged(sagaId, actionResponse.actionId, (ActionStatus) tuple2.v1(), (List) tuple2.v2());
        }).peek(logValues("stateTransitionsActionResponse"));
    }
}
