/*
 * Decompiled with CFR 0.152.
 */
package io.ryos.rhino.sdk.runners;

import io.ryos.rhino.sdk.RampupInfo;
import io.ryos.rhino.sdk.SimulationConfig;
import io.ryos.rhino.sdk.SimulationMetadata;
import io.ryos.rhino.sdk.ThrottlingInfo;
import io.ryos.rhino.sdk.data.Context;
import io.ryos.rhino.sdk.data.Scenario;
import io.ryos.rhino.sdk.data.UserSession;
import io.ryos.rhino.sdk.runners.AbstractSimulationRunner;
import io.ryos.rhino.sdk.runners.DefaultRunnerSimulationInjector;
import io.ryos.rhino.sdk.runners.DefaultSimulationCallable;
import io.ryos.rhino.sdk.runners.EventDispatcher;
import io.ryos.rhino.sdk.runners.Rampup;
import io.ryos.rhino.sdk.runners.ReactiveHttpSimulationRunner;
import io.ryos.rhino.sdk.runners.Throttler;
import io.ryos.rhino.sdk.users.repositories.CyclicUserSessionRepositoryImpl;
import io.ryos.rhino.sdk.users.repositories.UserRepository;
import io.ryos.rhino.sdk.utils.ReflectionUtils;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class DefaultSimulationRunner
extends AbstractSimulationRunner {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSimulationRunner.class);
    private static final String JOB = "job";
    private static final long ONE_SEC = 1000L;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final EventDispatcher eventDispatcher = new EventDispatcher(this.getSimulationMetadata());
    private final Condition continueCondition;
    private final Lock masterLock = new ReentrantLock();
    private volatile boolean shutdownInitiated;
    private volatile Disposable subscribe;
    private volatile boolean isPipelineCompleted;

    public DefaultSimulationRunner(Context context) {
        super((SimulationMetadata)context.get(JOB).orElseThrow());
        this.continueCondition = this.masterLock.newCondition();
    }

    @Override
    public void start() {
        RampupInfo rampUpInfo;
        SimulationMetadata simulationMetadata = this.getSimulationMetadata();
        LOG.info("Starting load test for {} minutes ...", (Object)simulationMetadata.getDuration().toMinutes());
        UserRepository<UserSession> userRepository = simulationMetadata.getUserRepository();
        CyclicUserSessionRepositoryImpl userSessionProvider = new CyclicUserSessionRepositoryImpl(userRepository, simulationMetadata.getUserRegion(), simulationMetadata.getNumberOfUsers());
        if (simulationMetadata.getGrafanaInfo() != null) {
            this.setUpGrafanaDashboard();
        }
        this.prepareUserSessions(userSessionProvider.getUserList());
        Stream<UserSession> users = Stream.generate(userSessionProvider::take);
        Flux flux = Flux.fromStream(users);
        ThrottlingInfo throttlingInfo = simulationMetadata.getThrottlingInfo();
        if (throttlingInfo != null) {
            Throttler.Limit rpsLimit = Throttler.Limit.of(throttlingInfo.getRps(), throttlingInfo.getDuration());
            flux = flux.transform(Throttler.throttle(rpsLimit));
        }
        if ((rampUpInfo = simulationMetadata.getRampUpInfo()) != null) {
            flux = flux.transform(Rampup.rampup(rampUpInfo.getStartRps(), rampUpInfo.getTargetRps(), rampUpInfo.getDuration()));
        }
        this.subscribe = flux.onErrorResume(t -> Mono.empty()).take(simulationMetadata.getDuration()).parallel(SimulationConfig.getParallelisation()).runOn(Schedulers.elastic()).doOnTerminate(this::notifyAwaiting).doOnNext(userSession -> {
            Object instance = ReflectionUtils.instanceOf(simulationMetadata.getSimulationClass()).orElseThrow();
            new DefaultRunnerSimulationInjector(simulationMetadata, null).injectOn(instance);
            simulationMetadata.getScenarios().forEach(scenario -> new DefaultSimulationCallable(simulationMetadata, (UserSession)userSession, (Scenario)scenario, this.eventDispatcher, instance).call());
        }).doOnComplete(() -> this.signalCompletion(() -> {
            this.isPipelineCompleted = true;
        })).subscribe();
        this.awaitIf(!this.isPipelineCompleted);
        LOG.info("Cleaning up ...");
        this.cleanupUserSessions(userSessionProvider.getUserList());
        this.shutdown();
        LOG.info("Bye!");
    }

    private void signalCompletion(ReactiveHttpSimulationRunner.Action action) {
        try {
            this.masterLock.lock();
            action.execute();
            this.continueCondition.signal();
        }
        catch (IllegalMonitorStateException e) {
            LOG.debug("Await not called yet. The cleanup completed before the main thread got to be awaited. Main thread will continue.");
        }
        finally {
            this.masterLock.unlock();
        }
    }

    private void awaitIf(boolean conditional) {
        try {
            this.masterLock.lock();
            if (conditional) {
                this.continueCondition.await();
            }
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted.", (Throwable)e);
        }
        finally {
            this.masterLock.unlock();
        }
    }

    private void prepareUserSessions(List<UserSession> userSessionList) {
        if (this.getSimulationMetadata().getPrepareMethod() != null) {
            LOG.info("Preparation started.");
            userSessionList.forEach(session -> ReflectionUtils.executeStaticMethod(this.getSimulationMetadata().getPrepareMethod(), session));
            LOG.info("Preparation completed.");
        }
    }

    private void cleanupUserSessions(List<UserSession> userSessionList) {
        if (this.getSimulationMetadata().getCleanupMethod() != null) {
            LOG.info("Clean-up started.");
            userSessionList.forEach(session -> {
                ReflectionUtils.executeStaticMethod(this.getSimulationMetadata().getCleanupMethod(), session);
                session.empty();
            });
            LOG.info("Clean-up completed.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyAwaiting() {
        DefaultSimulationRunner defaultSimulationRunner = this;
        synchronized (defaultSimulationRunner) {
            this.notify();
        }
    }

    @Override
    public void stop() {
        LOG.info("Someone pushed the stop() button on runner.");
        this.shutdown();
    }

    private void shutdown() {
        if (this.shutdownInitiated) {
            return;
        }
        this.shutdownInitiated = true;
        LOG.info("Stopping the simulation...");
        this.subscribe.dispose();
        LOG.info("Shutting down the system ...");
        this.eventDispatcher.stop();
        this.scheduler.shutdown();
        int retry = 0;
        while (!this.scheduler.isShutdown() && ++retry < 5) {
            this.waitForASec();
        }
        this.scheduler.shutdownNow();
        LOG.info("Shutting down completed ...");
    }

    private void waitForASec() {
        LOG.info("Wait ...");
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

