package org.molr.mole.core.tree;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.molr.commons.domain.Block;
import org.molr.commons.domain.Result;
import org.molr.commons.domain.RunState;
import org.molr.commons.domain.Strand;
import org.molr.commons.domain.StrandCommand;
import org.molr.mole.core.tree.exception.RejectedCommandException;
import org.molr.mole.core.tree.exception.StrandExecutorException;
import org.molr.mole.core.utils.Exceptions;
import org.molr.mole.core.utils.ThreadFactories;
import org.molr.mole.core.utils.Trees;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/molr/mole/core/tree/ConcurrentStrandExecutor.class */
public class ConcurrentStrandExecutor implements StrandExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentStrandExecutor.class);
    private static final int EXECUTOR_SLEEP_MS_IDLE = 50;
    private static final int EXECUTOR_SLEEP_MS_DEFAULT = 10;
    private static final int EXECUTOR_SLEEP_MS_WAITING_FOR_CHILDREN = 25;
    private final Object cycleLock = new Object();
    private final ExecutorService executor;
    private final LinkedBlockingQueue<StrandCommand> commandQueue;
    private final TreeStructure structure;
    private final Strand strand;
    private final StrandFactory strandFactory;
    private final StrandExecutorFactory strandExecutorFactory;
    private final LeafExecutor leafExecutor;
    private final ReplayProcessor<StrandCommand> lastCommandSink;
    private final Flux<StrandCommand> lastCommandStream;
    private final ReplayProcessor<RunState> stateSink;
    private final Flux<RunState> stateStream;
    private final ReplayProcessor<Block> blockSink;
    private final Flux<Block> blockStream;
    private final EmitterProcessor<Exception> errorSink;
    private final Flux<Exception> errorStream;
    private final EmitterProcessor<ImmutableList<StrandExecutor>> childExecutorsSink;
    private final AtomicReference<Set<StrandCommand>> allowedCommands;
    private final AtomicReference<ExecutorState> actualState;
    private final AtomicReference<Block> actualBlock;
    private Block currentStepOverSource;
    private ImmutableList<StrandExecutor> childExecutors;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.molr.mole.core.tree.ConcurrentStrandExecutor$1, reason: invalid class name */
    /* loaded from: input_file:org/molr/mole/core/tree/ConcurrentStrandExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$molr$commons$domain$RunState;

        static {
            try {
                $SwitchMap$org$molr$mole$core$tree$ConcurrentStrandExecutor$ExecutorState[ExecutorState.WAITING_FOR_CHILDREN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$molr$mole$core$tree$ConcurrentStrandExecutor$ExecutorState[ExecutorState.IDLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$molr$mole$core$tree$ConcurrentStrandExecutor$ExecutorState[ExecutorState.RESUMING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$molr$mole$core$tree$ConcurrentStrandExecutor$ExecutorState[ExecutorState.RUNNING_LEAF.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$molr$mole$core$tree$ConcurrentStrandExecutor$ExecutorState[ExecutorState.STEPPING_OVER.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$molr$mole$core$tree$ConcurrentStrandExecutor$ExecutorState[ExecutorState.FINISHED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$org$molr$commons$domain$RunState = new int[RunState.values().length];
            try {
                $SwitchMap$org$molr$commons$domain$RunState[RunState.PAUSED.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$molr$commons$domain$RunState[RunState.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/molr/mole/core/tree/ConcurrentStrandExecutor$ExecutorState.class */
    public enum ExecutorState {
        IDLE,
        STEPPING_OVER,
        RUNNING_LEAF,
        RESUMING,
        FINISHED,
        WAITING_FOR_CHILDREN
    }

    public ConcurrentStrandExecutor(Strand strand, Block block, TreeStructure treeStructure, StrandFactory strandFactory, StrandExecutorFactory strandExecutorFactory, LeafExecutor leafExecutor) {
        Objects.requireNonNull(block, "actualBlock cannot be null");
        this.structure = (TreeStructure) Objects.requireNonNull(treeStructure, "structure cannot be null");
        this.strand = (Strand) Objects.requireNonNull(strand, "strand cannot be null");
        this.strandFactory = (StrandFactory) Objects.requireNonNull(strandFactory, "strandFactory cannot be null");
        this.strandExecutorFactory = (StrandExecutorFactory) Objects.requireNonNull(strandExecutorFactory, "strandExecutorFactory cannot be null");
        this.leafExecutor = (LeafExecutor) Objects.requireNonNull(leafExecutor, "leafExecutor cannot be null");
        this.lastCommandSink = ReplayProcessor.cacheLast();
        this.lastCommandStream = this.lastCommandSink.publishOn(publishingScheduler("last-command"));
        this.errorSink = EmitterProcessor.create();
        this.errorStream = this.errorSink.publishOn(publishingScheduler("errors"));
        this.stateSink = ReplayProcessor.cacheLast();
        this.stateStream = this.stateSink.publishOn(publishingScheduler("states"));
        this.blockSink = ReplayProcessor.cacheLast();
        this.blockStream = this.blockSink.publishOn(publishingScheduler("cursor"));
        this.childExecutorsSink = EmitterProcessor.create();
        this.allowedCommands = new AtomicReference<>();
        this.actualBlock = new AtomicReference<>();
        this.actualState = new AtomicReference<>();
        this.currentStepOverSource = null;
        updateActualBlock(block);
        updateState(ExecutorState.IDLE);
        updateChildrenExecutors(ImmutableList.of());
        this.commandQueue = new LinkedBlockingQueue<>(1);
        this.executor = Executors.newSingleThreadExecutor(ThreadFactories.namedThreadFactory("strand" + strand.id() + "-exec-%d"));
        this.executor.submit(this::lifecycle);
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public void instruct(StrandCommand strandCommand) {
        if (this.commandQueue.offer(strandCommand)) {
            return;
        }
        LOGGER.warn("Command {} cannot be accepted by strand {} because it is processing another command", strandCommand, this.strand);
    }

    private void lifecycle() {
        boolean z = false;
        while (!z) {
            synchronized (this.cycleLock) {
                if (actualState() == ExecutorState.FINISHED) {
                    z = true;
                } else {
                    if (hasChildren() && actualState() == ExecutorState.WAITING_FOR_CHILDREN) {
                        this.childExecutors.stream().filter(strandExecutor -> {
                            return strandExecutor.getActualState() == RunState.FINISHED;
                        }).forEach(this::removeChildExecutor);
                    }
                    if (hasChildren() && actualState() != ExecutorState.WAITING_FOR_CHILDREN && actualState() != ExecutorState.IDLE) {
                        publishError(Exceptions.exception(StrandExecutorException.class, "[{}] inconsistent state! There are children, so current state can only be IDLE or WAITING FOR CHILDREN, pausing! Current state is {}", this.strand, actualState()));
                        updateState(ExecutorState.IDLE);
                    }
                    if (hasChildren()) {
                        Stream map = this.childExecutors.stream().map((v0) -> {
                            return v0.getActualState();
                        });
                        RunState runState = RunState.PAUSED;
                        runState.getClass();
                        boolean allMatch = map.allMatch((v1) -> {
                            return r1.equals(v1);
                        });
                        if (allMatch && actualState() != ExecutorState.IDLE) {
                            LOGGER.debug("[{}] paused because all children are paused", this.strand);
                            updateState(ExecutorState.IDLE);
                        } else if (!allMatch && actualState() != ExecutorState.WAITING_FOR_CHILDREN) {
                            LOGGER.debug("[{}] has some non-paused children. Setting the state to waiting", this.strand);
                            updateState(ExecutorState.WAITING_FOR_CHILDREN);
                        }
                    }
                    StrandCommand poll = this.commandQueue.poll();
                    if (poll == StrandCommand.PAUSE) {
                        pause();
                    }
                    if (poll == StrandCommand.SKIP) {
                        if (hasChildren()) {
                            publishError(new RejectedCommandException(poll, "[{}] has children so skipping is not allowed", this.strand));
                        } else {
                            moveNext();
                        }
                    }
                    if (poll == StrandCommand.STEP_INTO) {
                        if (hasChildren()) {
                            publishError(new RejectedCommandException(poll, "[{}] has children so step into is not allowed", this.strand));
                        } else {
                            stepInto();
                        }
                    }
                    if (poll == StrandCommand.STEP_OVER && hasChildren()) {
                        publishError(new RejectedCommandException(poll, "[{}] has children so step over is not allowed", this.strand));
                    }
                    if (poll == StrandCommand.STEP_OVER) {
                        this.currentStepOverSource = actualBlock();
                    }
                    if (poll == StrandCommand.STEP_OVER || poll == StrandCommand.RESUME) {
                        if (this.structure.isParallel(actualBlock()) && hasChildren()) {
                            updateState(ExecutorState.WAITING_FOR_CHILDREN);
                            LOGGER.debug("[{}] instructing children to RESUME", this.strand);
                            this.childExecutors.forEach(strandExecutor2 -> {
                                strandExecutor2.instruct(StrandCommand.RESUME);
                            });
                        } else if (poll == StrandCommand.STEP_OVER) {
                            updateState(ExecutorState.STEPPING_OVER);
                        } else {
                            updateState(ExecutorState.RESUMING);
                        }
                    }
                    if (actualState() == ExecutorState.WAITING_FOR_CHILDREN && !hasChildren()) {
                        if (this.currentStepOverSource != null) {
                            updateState(ExecutorState.STEPPING_OVER);
                        } else {
                            updateState(ExecutorState.RESUMING);
                        }
                        moveNext();
                    }
                    if (actualState() == ExecutorState.STEPPING_OVER && !this.structure.isDescendantOf(actualBlock(), this.currentStepOverSource)) {
                        updateState(ExecutorState.IDLE);
                        this.currentStepOverSource = null;
                    }
                    if (actualState() == ExecutorState.RESUMING || actualState() == ExecutorState.STEPPING_OVER) {
                        if (isLeaf(actualBlock())) {
                            LOGGER.debug("[{}] executing {}", this.strand, actualBlock());
                            Result execute = this.leafExecutor.execute(actualBlock());
                            if (execute == Result.SUCCESS) {
                                moveNext();
                            } else {
                                LOGGER.warn("[{}] execution of {} returned {}. Pausing strand", new Object[]{this.strand, actualBlock(), execute});
                                updateState(ExecutorState.IDLE);
                            }
                        } else if (this.structure.isParallel(actualBlock())) {
                            Iterator<Block> it = this.structure.childrenOf(actualBlock()).iterator();
                            while (it.hasNext()) {
                                createChildStrandExecutor(it.next()).instruct(StrandCommand.RESUME);
                            }
                            LOGGER.debug("[{}] waiting for children strand to finish", this.strand);
                            updateState(ExecutorState.WAITING_FOR_CHILDREN);
                        } else {
                            moveIntoFirstChild();
                        }
                    }
                    if (poll != null) {
                        LOGGER.debug("[{}] consumed command {}", this.strand, poll);
                        this.lastCommandSink.onNext(poll);
                    }
                    cycleSleep();
                }
            }
        }
        LOGGER.debug("Executor for strand {} is finished", this.strand);
        this.executor.shutdown();
    }

    private void pause() {
        if (hasChildren()) {
            LOGGER.debug("[{}] instructing children to pause", this.strand);
            this.childExecutors.forEach(strandExecutor -> {
                strandExecutor.instruct(StrandCommand.PAUSE);
            });
        } else {
            LOGGER.debug("[{}] paused", this.strand);
            updateState(ExecutorState.IDLE);
        }
    }

    private void stepInto() {
        if (isLeaf(actualBlock())) {
            LOGGER.debug("[{}] {} is a leaf, stepping into is not allowed", this.strand, actualBlock());
            return;
        }
        if (this.structure.isParallel(actualBlock())) {
            Iterator<Block> it = this.structure.childrenOf(actualBlock()).iterator();
            while (it.hasNext()) {
                createChildStrandExecutor(it.next()).instruct(StrandCommand.PAUSE);
            }
        } else {
            moveIntoFirstChild();
        }
        updateState(ExecutorState.IDLE);
    }

    private void moveIntoFirstChild() {
        List<Block> childrenOf = this.structure.childrenOf(actualBlock());
        if (childrenOf.isEmpty()) {
            throw ((IllegalStateException) Exceptions.exception(IllegalStateException.class, "Strand {} cannot move into block {}, no children!", this.strand.id(), actualBlock()));
        }
        updateActualBlock(childrenOf.get(0));
    }

    private void moveNext() {
        Optional<Block> nextBlock = this.structure.nextBlock(actualBlock());
        if (nextBlock.isPresent()) {
            updateActualBlock(nextBlock.get());
            return;
        }
        LOGGER.debug("[{}] {} is the last block. Finished", this.strand, actualBlock());
        updateState(ExecutorState.FINISHED);
        updateActualBlock(null);
    }

    private StrandExecutor createChildStrandExecutor(Block block) {
        Strand createChildStrand = this.strandFactory.createChildStrand(this.strand);
        StrandExecutor createStrandExecutor = this.strandExecutorFactory.createStrandExecutor(createChildStrand, this.structure.substructure(block));
        addChildExecutor(createStrandExecutor);
        LOGGER.debug("[{}] created child strand {}", this.strand, createChildStrand);
        return createStrandExecutor;
    }

    private void addChildExecutor(StrandExecutor strandExecutor) {
        updateChildrenExecutors(ImmutableList.builder().addAll(this.childExecutors).add(strandExecutor).build());
    }

    private void removeChildExecutor(StrandExecutor strandExecutor) {
        updateChildrenExecutors((ImmutableList) this.childExecutors.stream().filter(strandExecutor2 -> {
            return !strandExecutor2.equals(strandExecutor);
        }).collect(Collectors.collectingAndThen(Collectors.toList(), (v0) -> {
            return ImmutableList.copyOf(v0);
        })));
    }

    private void updateChildrenExecutors(ImmutableList<StrandExecutor> immutableList) {
        this.childExecutors = immutableList;
        this.childExecutorsSink.onNext(this.childExecutors);
        updateAllowedCommands();
    }

    private void updateActualBlock(Block block) {
        LOGGER.debug("[{}] block = {}", this.strand, block);
        this.actualBlock.set(block);
        this.blockSink.onNext(block);
        updateAllowedCommands();
    }

    private void updateState(ExecutorState executorState) {
        LOGGER.debug("[{}] state = {}", this.strand, executorState);
        this.actualState.set(executorState);
        this.stateSink.onNext(runStateFrom(executorState));
        updateAllowedCommands();
    }

    private void updateAllowedCommands() {
        if (actualBlock() == null || actualState() == null) {
            this.allowedCommands.set(ImmutableSet.of());
            return;
        }
        ImmutableSet.Builder builder = ImmutableSet.builder();
        switch (AnonymousClass1.$SwitchMap$org$molr$commons$domain$RunState[runStateFrom(actualState()).ordinal()]) {
            case 1:
                builder.add(StrandCommand.RESUME);
                if (!hasChildren()) {
                    builder.add(new StrandCommand[]{StrandCommand.STEP_OVER, StrandCommand.SKIP});
                    if (!isLeaf(actualBlock())) {
                        builder.add(StrandCommand.STEP_INTO);
                        break;
                    }
                }
                break;
            case 2:
                builder.add(StrandCommand.PAUSE);
                break;
        }
        this.allowedCommands.set(builder.build());
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public Flux<RunState> getStateStream() {
        return this.stateStream;
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public Flux<Block> getBlockStream() {
        return this.blockStream;
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public Flux<Exception> getErrorsStream() {
        return this.errorStream;
    }

    @VisibleForTesting
    @Deprecated
    public Flux<StrandCommand> getLastCommandStream() {
        return this.lastCommandStream;
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public Strand getStrand() {
        return this.strand;
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public RunState getActualState() {
        return runStateFrom(actualState());
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public Block getActualBlock() {
        return actualBlock();
    }

    @Override // org.molr.mole.core.tree.StrandExecutor
    public Set<StrandCommand> getAllowedCommands() {
        return this.allowedCommands.get();
    }

    @VisibleForTesting
    @Deprecated
    public void moveTo(Block block) {
        synchronized (this.cycleLock) {
            if (!this.structure.contains(block)) {
                throw ((IllegalArgumentException) Exceptions.exception(IllegalArgumentException.class, "Strand {} cannot move to {} as is not part of this tree structure", this.strand.id(), block));
            }
            if (Trees.doesBlockHaveAParallelParent(block, this.structure)) {
                throw ((IllegalArgumentException) Exceptions.exception(IllegalArgumentException.class, "Strand {} cannot move to {} as is descendant of a parallel block", this.strand.id(), block));
            }
            if (actualState() != ExecutorState.IDLE) {
                throw ((IllegalStateException) Exceptions.exception(IllegalStateException.class, "Strand {} can move only while in paused state! Currently in {}", this.strand.id(), actualState()));
            }
            updateActualBlock(block);
        }
    }

    @VisibleForTesting
    public Set<StrandExecutor> getChildrenStrandExecutors() {
        ImmutableSet copyOf;
        synchronized (this.cycleLock) {
            copyOf = ImmutableSet.copyOf(this.childExecutors);
        }
        return copyOf;
    }

    private ExecutorState actualState() {
        return this.actualState.get();
    }

    private Block actualBlock() {
        return this.actualBlock.get();
    }

    private boolean hasChildren() {
        return (this.childExecutors == null || this.childExecutors.isEmpty()) ? false : true;
    }

    private boolean isLeaf(Block block) {
        if (block == null) {
            return false;
        }
        return this.structure.isLeaf(block);
    }

    private void publishError(Exception exc) {
        LOGGER.error("[{}] {}: {}", new Object[]{this.strand, exc.getClass().getSimpleName(), exc.getMessage()});
        this.errorSink.onNext(exc);
    }

    private Scheduler publishingScheduler(String str) {
        return Schedulers.newSingle("strand-exec-" + this.strand.id() + "-" + str);
    }

    private void cycleSleep() {
        try {
            switch (actualState()) {
                case WAITING_FOR_CHILDREN:
                    Thread.sleep(25L);
                    break;
                case IDLE:
                    Thread.sleep(50L);
                    break;
                default:
                    Thread.sleep(10L);
                    break;
            }
        } catch (InterruptedException e) {
            throw ((IllegalStateException) Exceptions.exception(IllegalStateException.class, "Strand {} thread interrupted!", this.strand.id(), e));
        }
    }

    private static RunState runStateFrom(ExecutorState executorState) {
        switch (executorState) {
            case WAITING_FOR_CHILDREN:
            case RESUMING:
            case RUNNING_LEAF:
            case STEPPING_OVER:
                return RunState.RUNNING;
            case IDLE:
                return RunState.PAUSED;
            case FINISHED:
                return RunState.FINISHED;
            default:
                throw ((IllegalArgumentException) Exceptions.exception(IllegalArgumentException.class, "Strand state {} cannot be mapped to a RunState", executorState));
        }
    }
}
