package org.molr.mole.core.single;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.molr.commons.domain.Block;
import org.molr.commons.domain.MissionInput;
import org.molr.commons.domain.MissionOutput;
import org.molr.commons.domain.MissionRepresentation;
import org.molr.commons.domain.MissionState;
import org.molr.commons.domain.Placeholder;
import org.molr.commons.domain.Placeholders;
import org.molr.commons.domain.Result;
import org.molr.commons.domain.RunState;
import org.molr.commons.domain.Strand;
import org.molr.commons.domain.StrandCommand;
import org.molr.mole.core.tree.BlockOutputCollector;
import org.molr.mole.core.tree.ConcurrentMissionOutputCollector;
import org.molr.mole.core.tree.MissionExecutor;
import org.molr.mole.core.tree.MissionOutputCollector;
import org.molr.mole.core.utils.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/molr/mole/core/single/SingleNodeMissionExecutor.class */
public class SingleNodeMissionExecutor<R> implements MissionExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(SingleNodeMissionExecutor.class);
    private final SingleNodeMission<R> mission;
    private final MissionRepresentation representation;
    private final BlockOutputCollector output;
    private final MissionInput input;
    private final MissionOutputCollector outputCollector = new ConcurrentMissionOutputCollector();
    private final ReplayProcessor<MissionState> stateSink = ReplayProcessor.cacheLast();
    private final Flux<MissionState> stateStream = this.stateSink.publishOn(Schedulers.newSingle("MissionState publisher"));
    private final ReplayProcessor<MissionRepresentation> representations = ReplayProcessor.cacheLast();
    private final Strand singleStrand = Strand.ofId("0");
    private final AtomicReference<RunState> strandRunState = new AtomicReference<>(RunState.PAUSED);
    private final AtomicReference<Result> result = new AtomicReference<>(Result.UNDEFINED);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    public SingleNodeMissionExecutor(SingleNodeMission<R> singleNodeMission, Map<String, Object> map) {
        this.mission = (SingleNodeMission) Objects.requireNonNull(singleNodeMission, "mission must not be null");
        this.input = MissionInput.from(map);
        this.representation = SingleNodeMissions.representationFor(singleNodeMission);
        this.representations.onNext(this.representation);
        this.output = new BlockOutputCollector(this.outputCollector, this.representation.rootBlock());
        publishState();
    }

    private void resume() {
        if (this.started.getAndSet(true)) {
            LOGGER.warn("Already Running. Doing nothing.");
            return;
        }
        this.strandRunState.set(RunState.RUNNING);
        this.executorService.submit(() -> {
            execute();
            finish();
        });
        publishState();
    }

    private void execute() {
        try {
            R apply = this.mission.executable().apply(this.input, this.output);
            this.result.set(Result.SUCCESS);
            emitReturnValue(apply);
        } catch (Exception e) {
            this.result.set(Result.FAILED);
            LOGGER.warn("Mission {} failed with exception. ", this.mission, e);
            this.output.emit((Placeholder<Placeholder>) Placeholders.THROWN, (Placeholder) Exceptions.stackTraceFrom(e));
        }
    }

    private void emitReturnValue(R r) {
        Class<R> returnType = this.mission.returnType();
        if (Void.class.isAssignableFrom(returnType)) {
            LOGGER.debug("Mission {} has a void return type. Emitting no return value.");
            return;
        }
        Optional returned = Placeholders.returned(returnType);
        if (returned.isPresent()) {
            this.output.emit((Placeholder<Placeholder>) returned.get(), (Placeholder) r);
        } else {
            LOGGER.debug("Placeholder for type {} is not supported. Emitting the string representation of the return value instead.");
            this.output.emit((Placeholder<Placeholder>) Placeholders.RETURNED_STRING, (Placeholder) Objects.toString(r));
        }
    }

    private void finish() {
        this.strandRunState.set(RunState.FINISHED);
        publishState();
    }

    private void publishState() {
        Result result = this.result.get();
        RunState runState = this.strandRunState.get();
        MissionState.Builder builder = MissionState.builder(result);
        builder.add(this.singleStrand, runState, cursor(), allowedCommands());
        builder.blockRunState(rootBlock(), runState);
        builder.blockResult(rootBlock(), result);
        this.stateSink.onNext(builder.build());
    }

    private Block cursor() {
        if (RunState.FINISHED.equals(this.strandRunState.get())) {
            return null;
        }
        return rootBlock();
    }

    private Block rootBlock() {
        return this.representation.rootBlock();
    }

    private Set<StrandCommand> allowedCommands() {
        return !this.started.get() ? ImmutableSet.of(StrandCommand.RESUME) : ImmutableSet.of();
    }

    @Override // org.molr.mole.core.tree.MissionExecutor
    public void instruct(Strand strand, StrandCommand strandCommand) {
        if (!this.singleStrand.equals(strand)) {
            LOGGER.warn("given strand {} is not equal to strand {}. Doing nothing.", strand, this.singleStrand);
        } else if (StrandCommand.RESUME.equals(strandCommand)) {
            resume();
        } else {
            LOGGER.warn("given command {} is not supported. Doing nothing.", strandCommand);
        }
    }

    @Override // org.molr.mole.core.tree.MissionExecutor
    public void instructRoot(StrandCommand strandCommand) {
        instruct(this.singleStrand, strandCommand);
    }

    @Override // org.molr.mole.core.tree.MissionExecutor
    public Flux<MissionState> states() {
        return this.stateStream;
    }

    @Override // org.molr.mole.core.tree.MissionExecutor
    public Flux<MissionOutput> outputs() {
        return this.outputCollector.asStream();
    }

    @Override // org.molr.mole.core.tree.MissionExecutor
    public Flux<MissionRepresentation> representations() {
        return this.representations;
    }
}
