/*
 * Decompiled with CFR 0.152.
 */
package io.ryos.rhino.sdk.runners;

import com.google.common.collect.Streams;
import io.ryos.rhino.sdk.CyclicIterator;
import io.ryos.rhino.sdk.RampupInfo;
import io.ryos.rhino.sdk.SimulationConfig;
import io.ryos.rhino.sdk.SimulationMetadata;
import io.ryos.rhino.sdk.ThrottlingInfo;
import io.ryos.rhino.sdk.data.Context;
import io.ryos.rhino.sdk.data.UserSession;
import io.ryos.rhino.sdk.dsl.ConnectableDsl;
import io.ryos.rhino.sdk.dsl.MaterializerFactory;
import io.ryos.rhino.sdk.runners.AbstractSimulationRunner;
import io.ryos.rhino.sdk.runners.EventDispatcher;
import io.ryos.rhino.sdk.runners.Rampup;
import io.ryos.rhino.sdk.runners.ReactiveRunnerSimulationInjector;
import io.ryos.rhino.sdk.runners.Throttler;
import io.ryos.rhino.sdk.specs.ConditionalSpecWrapper;
import io.ryos.rhino.sdk.specs.Spec;
import io.ryos.rhino.sdk.users.repositories.CyclicUserSessionRepositoryImpl;
import io.ryos.rhino.sdk.users.repositories.UserRepository;
import io.ryos.rhino.sdk.utils.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.filter.RequestFilter;
import org.asynchttpclient.filter.ThrottleRequestFilter;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

public class ReactiveHttpSimulationRunner
extends AbstractSimulationRunner {
    private static final Logger LOG = LoggerFactory.getLogger(ReactiveHttpSimulationRunner.class);
    private static final String JOB = "job";
    private final Context context;
    private CyclicIterator<ConnectableDsl> dslIterator;
    private Disposable subscribe;
    private volatile boolean shutdownInitiated;
    private volatile boolean isCleanupCompleted;
    private volatile boolean isPrepareCompleted;
    private volatile boolean isPipelineCompleted;
    private final Condition continueCondition;
    private final Lock masterLock;
    private final EventDispatcher eventDispatcher;

    public ReactiveHttpSimulationRunner(Context context) {
        super((SimulationMetadata)context.get(JOB).orElseThrow());
        this.context = context;
        this.dslIterator = new CyclicIterator(this.getSimulationMetadata().getDsls().stream().filter(Objects::nonNull).map(spec -> (ConnectableDsl)spec).collect(Collectors.toList()));
        this.eventDispatcher = new EventDispatcher(this.getSimulationMetadata());
        this.masterLock = new ReentrantLock();
        this.continueCondition = this.masterLock.newCondition();
    }

    @Override
    public void start() {
        SimulationMetadata simulationMetadata = this.getSimulationMetadata();
        LOG.info("Starting load test for {} minutes ...", (Object)simulationMetadata.getDuration().toMinutes());
        if (simulationMetadata.getGrafanaInfo() != null) {
            this.setUpGrafanaDashboard();
        }
        UserRepository<UserSession> userRepository = simulationMetadata.getUserRepository();
        CyclicUserSessionRepositoryImpl userSessionProvider = new CyclicUserSessionRepositoryImpl(userRepository, "all");
        DefaultAsyncHttpClientConfig httpClientConfig = Dsl.config().setKeepAlive(true).setMaxConnections(SimulationConfig.getMaxConnections()).setConnectTimeout(SimulationConfig.getHttpConnectTimeout()).setHandshakeTimeout(SimulationConfig.getHttpHandshakeTimeout()).setReadTimeout(SimulationConfig.getHttpReadTimeout()).setRequestTimeout(SimulationConfig.getHttpRequestTimeout()).addRequestFilter((RequestFilter)new ThrottleRequestFilter(SimulationConfig.getMaxConnections())).build();
        AsyncHttpClient client = Dsl.asyncHttpClient((AsyncHttpClientConfig)httpClientConfig);
        ReactiveRunnerSimulationInjector injector = new ReactiveRunnerSimulationInjector(simulationMetadata, null);
        List<UserSession> userList = userSessionProvider.getUserList();
        injector.injectOn(simulationMetadata.getTestInstance());
        this.prepare(client, userList);
        Flux flux = Flux.fromStream(Stream.generate(userSessionProvider::take));
        flux = this.appendRampUp((Flux<UserSession>)flux);
        flux = this.appendThrottling(flux);
        flux = flux.take(simulationMetadata.getDuration()).zipWith((Publisher)Flux.fromStream((Stream)Streams.stream(this.dslIterator))).onErrorResume(this::handleError).doOnError(t -> LOG.error("Something unexpected happened", t)).doOnTerminate(this::shutdown).doOnComplete(() -> this.signalCompletion(() -> {
            this.isPrepareCompleted = true;
        })).flatMap(tuple -> this.getPublisher(client, (UserSession)tuple.getT1(), (ConnectableDsl)tuple.getT2()));
        this.subscribe = flux.subscribe();
        this.awaitIf(!this.isPipelineCompleted);
        this.cleanup(client, userList);
        this.shutdown();
    }

    private void cleanup(AsyncHttpClient client, List<UserSession> userList) {
        if (this.getSimulationMetadata().getCleanupMethod() != null) {
            LOG.info("Clean-up started.");
            this.cleanUpUserSessions(userList, client);
            this.awaitIf(!this.isCleanupCompleted);
        }
    }

    private void prepare(AsyncHttpClient client, List<UserSession> userList) {
        if (this.getSimulationMetadata().getPrepareMethod() != null) {
            LOG.info("Preparation started.");
            this.prepareUserSessions(userList, client);
            this.awaitIf(!this.isPrepareCompleted);
        }
    }

    private void awaitIf(boolean conditional) {
        try {
            this.masterLock.lock();
            if (conditional) {
                this.continueCondition.await();
            }
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted.", (Throwable)e);
        }
        finally {
            this.masterLock.unlock();
        }
    }

    private Publisher<? extends UserSession> getPublisher(AsyncHttpClient client, UserSession session, ConnectableDsl dsl) {
        Iterator<Spec> specIt = dsl.getSpecs().iterator();
        MaterializerFactory materializerFactory = new MaterializerFactory(client, this.eventDispatcher);
        if (!specIt.hasNext()) {
            throw new RuntimeException("No spec found in DSL.");
        }
        Mono acc = materializerFactory.monoFrom(specIt.next(), session);
        while (specIt.hasNext()) {
            Spec next = specIt.next();
            acc = acc.flatMap(s -> {
                Predicate<UserSession> predicate;
                if (this.isConditionalSpec(next) && !(predicate = ((ConditionalSpecWrapper)next).getPredicate()).test((UserSession)s)) {
                    return Mono.just((Object)s);
                }
                return materializerFactory.monoFrom(next, session);
            });
        }
        return acc.doOnError(e -> LOG.error("Unexpected error: ", e));
    }

    private Publisher<? extends Tuple2<UserSession, ConnectableDsl>> handleError(Throwable t) {
        LOG.error("Skipping error", t);
        return Mono.empty();
    }

    private boolean isConditionalSpec(Spec next) {
        return next instanceof ConditionalSpecWrapper;
    }

    private Flux<UserSession> appendThrottling(Flux<UserSession> flux) {
        ThrottlingInfo throttlingInfo = this.getSimulationMetadata().getThrottlingInfo();
        if (throttlingInfo != null) {
            Throttler.Limit rpsLimit = Throttler.Limit.of(throttlingInfo.getRps(), throttlingInfo.getDuration());
            flux = flux.transform(Throttler.throttle(rpsLimit));
        }
        return flux;
    }

    private Flux<UserSession> appendRampUp(Flux<UserSession> flux) {
        RampupInfo rampUpInfo = this.getSimulationMetadata().getRampUpInfo();
        if (rampUpInfo != null) {
            flux = flux.transform(Rampup.rampup(rampUpInfo.getStartRps(), rampUpInfo.getTargetRps(), rampUpInfo.getDuration()));
        }
        return flux;
    }

    @Override
    public void stop() {
        LOG.info("Someone pushed the stop() button on runner.");
        this.shutdown();
    }

    private void shutdown() {
        if (this.shutdownInitiated) {
            return;
        }
        if (!this.isCleanupCompleted) {
            return;
        }
        this.shutdownInitiated = true;
        LOG.info("Stopping the simulation...");
        this.subscribe.dispose();
        this.dslIterator.stop();
        this.eventDispatcher.stop();
        LOG.info("Shutting down completed ...");
        LOG.info("Bye!");
    }

    private void prepareUserSessions(List<UserSession> userSessionList, AsyncHttpClient client) {
        if (this.getSimulationMetadata().getPrepareMethod() != null) {
            this.materializeMethod(this.getSimulationMetadata().getPrepareMethod(), userSessionList, client, () -> {
                this.isPrepareCompleted = true;
                LOG.info("Preparation completed.");
            });
        }
    }

    private void cleanUpUserSessions(List<UserSession> userSessionList, AsyncHttpClient client) {
        if (this.getSimulationMetadata().getCleanupMethod() != null) {
            this.materializeMethod(this.getSimulationMetadata().getCleanupMethod(), userSessionList, client, () -> {
                LOG.info("Clean-up completed.");
                this.isCleanupCompleted = true;
                this.eventDispatcher.stop();
            });
        }
    }

    private void materializeMethod(Method method, List<UserSession> userSessionList, AsyncHttpClient client, Action action) {
        if (method != null) {
            Flux.fromStream(userSessionList.stream()).onErrorResume(this::handleThrowable).flatMap(session -> this.getPublisher(client, (UserSession)session, (ConnectableDsl)ReflectionUtils.executeStaticMethod(method, session))).doOnError(throwable -> LOG.error("Something unexpected happened", throwable)).doOnComplete(() -> this.signalCompletion(action)).blockLast();
        }
    }

    private void signalCompletion(Action action) {
        try {
            this.masterLock.lock();
            action.execute();
            this.continueCondition.signal();
        }
        catch (IllegalMonitorStateException e) {
            LOG.debug("Await not called yet. The cleanup completed before the main thread got to be awaited. Main thread will continue.");
        }
        finally {
            this.masterLock.unlock();
        }
    }

    private Publisher<? extends UserSession> handleThrowable(Throwable throwable) {
        LOG.error("Skipping error. Pipeline continues.", throwable);
        return Mono.empty();
    }

    @FunctionalInterface
    public static interface Action {
        public void execute();
    }
}

