/*
 * Decompiled with CFR 0.152.
 */
package io.ryos.rhino.sdk.runners;

import com.google.common.collect.Streams;
import io.ryos.rhino.sdk.CyclicIterator;
import io.ryos.rhino.sdk.RampupInfo;
import io.ryos.rhino.sdk.SimulationConfig;
import io.ryos.rhino.sdk.SimulationMetadata;
import io.ryos.rhino.sdk.ThrottlingInfo;
import io.ryos.rhino.sdk.data.Context;
import io.ryos.rhino.sdk.data.UserSession;
import io.ryos.rhino.sdk.dsl.ConnectableDsl;
import io.ryos.rhino.sdk.dsl.HttpSpecMaterializer;
import io.ryos.rhino.sdk.dsl.SomeSpecMaterializer;
import io.ryos.rhino.sdk.dsl.WaitSpecMaterializer;
import io.ryos.rhino.sdk.exceptions.MaterializerNotFound;
import io.ryos.rhino.sdk.io.Out;
import io.ryos.rhino.sdk.monitoring.GrafanaGateway;
import io.ryos.rhino.sdk.runners.EventDispatcher;
import io.ryos.rhino.sdk.runners.Rampup;
import io.ryos.rhino.sdk.runners.ReactiveRunnerSimulationInjector;
import io.ryos.rhino.sdk.runners.SimulationRunner;
import io.ryos.rhino.sdk.runners.Throttler;
import io.ryos.rhino.sdk.specs.ConditionalSpecWrapper;
import io.ryos.rhino.sdk.specs.HttpSpec;
import io.ryos.rhino.sdk.specs.SomeSpec;
import io.ryos.rhino.sdk.specs.Spec;
import io.ryos.rhino.sdk.specs.WaitSpec;
import io.ryos.rhino.sdk.users.repositories.CyclicUserSessionRepositoryImpl;
import io.ryos.rhino.sdk.users.repositories.UserRepository;
import io.ryos.rhino.sdk.utils.ReflectionUtils;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveHttpSimulationRunner
implements SimulationRunner {
    private static final String JOB = "job";
    private static final long ONE_SEC = 1000L;
    private static final long MAX_WAIT_FOR_USER = 60L;
    private static final int CONNECT_TIMEOUT = 500;
    private final Context context;
    private SimulationMetadata simulationMetadata;
    private CyclicIterator<ConnectableDsl> dslIterator;
    private Disposable subscribe;
    private volatile boolean shutdownInitiated;
    private final EventDispatcher eventDispatcher;

    public ReactiveHttpSimulationRunner(Context context) {
        this.context = context;
        this.simulationMetadata = (SimulationMetadata)context.get(JOB).orElseThrow();
        this.dslIterator = new CyclicIterator(this.simulationMetadata.getSpecs().stream().filter(Objects::nonNull).map(spec -> (ConnectableDsl)spec).collect(Collectors.toList()));
        this.eventDispatcher = new EventDispatcher(this.simulationMetadata);
    }

    @Override
    public void start() {
        RampupInfo rampUpInfo;
        Out.info("Starting load test for " + this.simulationMetadata.getDuration().toMinutes() + " minutes ...");
        if (SimulationConfig.isGrafanaEnabled()) {
            this.setUpGrafanaDashboard();
        }
        UserRepository<UserSession> userRepository = this.simulationMetadata.getUserRepository();
        CyclicUserSessionRepositoryImpl userSessionProvider = new CyclicUserSessionRepositoryImpl(userRepository, "all");
        DefaultAsyncHttpClientConfig httpClientConfig = Dsl.config().setConnectTimeout(500).setMaxConnections(SimulationConfig.getMaxConnections()).setKeepAlive(true).build();
        AsyncHttpClient client = Dsl.asyncHttpClient((AsyncHttpClientConfig)httpClientConfig);
        ReactiveRunnerSimulationInjector injector = new ReactiveRunnerSimulationInjector(this.simulationMetadata, null);
        injector.injectOn(this.simulationMetadata.getTestInstance());
        this.prepareUserSessions();
        Flux flux = Flux.fromStream(Stream.generate(userSessionProvider::take));
        ThrottlingInfo throttlingInfo = this.simulationMetadata.getThrottlingInfo();
        if (throttlingInfo != null) {
            Throttler.Limit rpsLimit = Throttler.Limit.of(throttlingInfo.getNumberOfRequests(), throttlingInfo.getDuration());
            flux = flux.transform(Throttler.throttle(rpsLimit));
        }
        if ((rampUpInfo = this.simulationMetadata.getRampUpInfo()) != null) {
            flux = flux.transform(Rampup.rampup(rampUpInfo.getStartRps(), rampUpInfo.getTargetRps(), rampUpInfo.getDuration()));
        }
        this.subscribe = flux.take(this.simulationMetadata.getDuration()).zipWith((Publisher)Flux.fromStream((Stream)Streams.stream(this.dslIterator))).doOnError(t -> Out.error(t.getMessage())).doOnTerminate(this::terminate).doOnComplete(() -> {
            this.shutdownInitiated = true;
        }).flatMap(tuple -> {
            UserSession session = (UserSession)tuple.getT1();
            ConnectableDsl dsl = (ConnectableDsl)tuple.getT2();
            Iterator<Spec> specIt = dsl.getSpecs().iterator();
            if (!specIt.hasNext()) {
                throw new RuntimeException("No spec found in DSL.");
            }
            Mono acc = this.materialize(specIt.next(), client, session);
            while (specIt.hasNext()) {
                Spec next = specIt.next();
                acc = acc.flatMap(s -> {
                    Predicate<UserSession> predicate;
                    if (next instanceof ConditionalSpecWrapper && !(predicate = ((ConditionalSpecWrapper)next).getPredicate()).test((UserSession)s)) {
                        return Mono.just((Object)s);
                    }
                    return this.materialize(next, client, session);
                });
            }
            return acc.doOnError(System.out::println);
        }).subscribe();
        this.await();
        this.stop();
    }

    private void terminate() {
        this.cleanupUserSessions();
        this.notifyAwaiting();
    }

    private void setUpGrafanaDashboard() {
        Out.info("Grafana is enabled. Creating dashboard: " + SimulationConfig.getSimulationId());
        GrafanaGateway grafanaGateway = new GrafanaGateway();
        grafanaGateway.setUpDashboard(SimulationConfig.getSimulationId(), (String[])this.simulationMetadata.getSpecs().stream().map(dsl -> (ConnectableDsl)dsl).map(ConnectableDsl::getName).toArray(String[]::new));
    }

    private Mono<UserSession> materialize(Spec spec, AsyncHttpClient client, UserSession session) {
        if (spec instanceof HttpSpec) {
            return new HttpSpecMaterializer(client, this.eventDispatcher).materialize((HttpSpec)spec, session);
        }
        if (spec instanceof SomeSpec) {
            return new SomeSpecMaterializer(this.eventDispatcher).materialize((SomeSpec)spec, session);
        }
        if (spec instanceof WaitSpec) {
            return new WaitSpecMaterializer().materialize((WaitSpec)spec, session);
        }
        if (spec instanceof ConditionalSpecWrapper) {
            return this.materialize(((ConditionalSpecWrapper)spec).getSpec(), client, session);
        }
        throw new MaterializerNotFound("Materializer not found for spec: " + spec.getClass().getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void await() {
        ReactiveHttpSimulationRunner reactiveHttpSimulationRunner = this;
        synchronized (reactiveHttpSimulationRunner) {
            try {
                while (!this.shutdownInitiated) {
                    this.wait(1000L);
                }
            }
            catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyAwaiting() {
        ReactiveHttpSimulationRunner reactiveHttpSimulationRunner = this;
        synchronized (reactiveHttpSimulationRunner) {
            this.notifyAll();
        }
    }

    @Override
    public void stop() {
        Out.info("Someone pushed the stop() button on runner.");
        this.terminate();
    }

    private void shutdown() {
        if (this.shutdownInitiated) {
            return;
        }
        this.shutdownInitiated = true;
        Out.info("Stopping the simulation...");
        this.subscribe.dispose();
        Out.info("Shutting down the system ...");
        this.eventDispatcher.stop();
        this.dslIterator.stop();
        Out.info("Shutting down completed ...");
        Out.info("Bye!");
    }

    private void waitForASec() {
        Out.info("Wait ...");
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            Out.info("Wait-Interrupted.");
        }
    }

    private void prepareUserSessions() {
        if (this.simulationMetadata.getPrepareMethod() != null) {
            ReflectionUtils.executeMethod(this.simulationMetadata.getPrepareMethod(), this.simulationMetadata.getTestInstance(), new Object[0]);
        }
    }

    private void cleanupUserSessions() {
        if (this.simulationMetadata.getCleanupMethod() != null) {
            ReflectionUtils.executeMethod(this.simulationMetadata.getCleanupMethod(), this.simulationMetadata.getTestInstance(), new Object[0]);
        }
    }
}

