/*
 * Decompiled with CFR 0.152.
 */
package io.ryos.rhino.sdk;

import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.actor.Terminated;
import akka.dispatch.OnComplete;
import akka.japi.function.Function;
import akka.japi.function.Predicate;
import akka.stream.ActorMaterializer;
import akka.stream.Graph;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.google.common.collect.Streams;
import io.ryos.rhino.sdk.Simulation;
import io.ryos.rhino.sdk.SimulationRunner;
import io.ryos.rhino.sdk.data.ContextImpl;
import io.ryos.rhino.sdk.data.Pair;
import io.ryos.rhino.sdk.data.Scenario;
import io.ryos.rhino.sdk.data.UserSession;
import io.ryos.rhino.sdk.io.CyclicIterator;
import io.ryos.rhino.sdk.users.UserRepository;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import scala.Function1;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

public class SimulationRunnerImpl
implements SimulationRunner {
    private static final Logger LOG = LogManager.getLogger(SimulationRunnerImpl.class);
    private static final String JOB = "job";
    private static final long ONE_SEC = 1000L;
    private static final int BUFFER_SIZE = 2000;
    private static final long INITIAL_DELAY = 0L;
    private static final long PERIOD = 1L;
    private static final int REPORTING_PERIOD = 5;
    private Simulation simulation;
    private ActorSystem system = ActorSystem.create((String)"rhino");
    private CyclicIterator<Scenario> scenarioCyclicIterator;
    private ScheduledExecutorService scheduler;
    private volatile long elapsed;
    private volatile int duration;

    SimulationRunnerImpl(ContextImpl context) {
        this.simulation = (Simulation)context.get(JOB).orElseThrow();
        this.scenarioCyclicIterator = new CyclicIterator<Scenario>(this.simulation.getRunnableScenarios());
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.duration = this.simulation.getDuration();
    }

    @Override
    public void start() {
        System.out.println("! Starting load test for " + this.simulation.getDuration() + " minutes ...");
        UserRepository<UserSession> userRepository = this.simulation.getUserRepository();
        this.waitUsers(userRepository);
        this.prepareUserSessions(userRepository.getUserSessions());
        Stream<UserSession> userStream = Stream.generate(userRepository::take);
        Stream<Scenario> scenarios = Stream.generate(this.scenarioCyclicIterator::next);
        ActorMaterializer materializer = ActorMaterializer.create((ActorRefFactory)this.system);
        this.scheduler.scheduleAtFixedRate(() -> {
            if (this.elapsed % 5L == 0L) {
                System.out.println("* Ping? Pong! Running ... " + this.elapsed + " seconds.");
            }
            this.shutdownIfCompleted();
        }, 0L, 1L, TimeUnit.SECONDS);
        Source source = Source.from(Streams.zip(userStream, scenarios, Pair::new)::iterator).viaMat(KillSwitches.single(), Keep.right());
        CompletionStage doneCompletionStage = (CompletionStage)source.buffer(2000, OverflowStrategy.backpressure()).takeWhile((Predicate & Serializable)p -> this.checkDuration()).map((Function & Serializable)p -> CompletableFuture.supplyAsync(() -> this.simulation.run((UserSession)p.first, (Scenario)p.second), (Executor)materializer.executionContext())).async().runWith((Graph)Sink.ignore(), (Materializer)materializer);
        doneCompletionStage.thenRun(() -> ((ActorSystem)this.system).terminate());
        doneCompletionStage.exceptionally(t -> {
            Future terminate = this.system.terminate();
            terminate.onComplete((Function1)new OnComplete<Terminated>(){

                public void onComplete(Throwable throwable, Terminated terminated) {
                    if (throwable != null) {
                        System.err.println(throwable.getMessage());
                    }
                    System.exit(-1);
                }
            }, (ExecutionContext)this.system.dispatcher());
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownIfCompleted() {
        SimulationRunnerImpl simulationRunnerImpl = this;
        synchronized (simulationRunnerImpl) {
            if (++this.elapsed > (long)(this.duration * 60)) {
                System.out.println("! Performance test is now completed. Shutting down the system ...");
                Future terminate = this.system.terminate();
                terminate.onComplete((Function1)new OnComplete<Terminated>(){

                    public void onComplete(Throwable throwable, Terminated terminated) {
                        System.exit(0);
                    }
                }, (ExecutionContext)this.system.dispatcher());
                this.simulation.stop();
                this.scheduler.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkDuration() {
        SimulationRunnerImpl simulationRunnerImpl = this;
        synchronized (simulationRunnerImpl) {
            return this.elapsed < (long)(this.duration * 60);
        }
    }

    @Override
    public void stop() {
        this.scenarioCyclicIterator.stop();
    }

    private void prepareUserSessions(List<UserSession> userSessions) {
        userSessions.forEach(us -> this.simulation.prepare((UserSession)us));
    }

    private void cleanupUserSessions(List<UserSession> userSessions) {
        userSessions.forEach(us -> this.simulation.cleanUp((UserSession)us));
    }

    private void waitUsers(UserRepository userRepository) {
        while (userRepository != null && !userRepository.has(this.simulation.getInjectUser())) {
            System.out.println("? Not sufficient user has been logged in. Required " + this.simulation.getInjectUser() + ". Waiting...");
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                LOG.warn("Waiting for users interrupted.");
            }
        }
        System.out.println("! User login completed.");
    }
}

