/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.jet;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.map.IMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.jet.FailedRunningPipelineResults;
import org.apache.beam.runners.jet.JetGraphVisitor;
import org.apache.beam.runners.jet.JetPipelineOptions;
import org.apache.beam.runners.jet.JetPipelineResult;
import org.apache.beam.runners.jet.JetTransformTranslator;
import org.apache.beam.runners.jet.JetTransformTranslators;
import org.apache.beam.runners.jet.metrics.JetMetricsContainer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JetRunner
extends PipelineRunner<PipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(JetRunner.class);
    private final JetPipelineOptions options;
    private final Function<ClientConfig, JetInstance> jetClientSupplier;
    private Function<PTransform<?, ?>, JetTransformTranslator<?>> translatorProvider;

    public static JetRunner fromOptions(PipelineOptions options) {
        return JetRunner.fromOptions(options, Jet::newJetClient);
    }

    public static JetRunner fromOptions(PipelineOptions options, Function<ClientConfig, JetInstance> jetClientSupplier) {
        return new JetRunner(options, jetClientSupplier);
    }

    private JetRunner(PipelineOptions options, Function<ClientConfig, JetInstance> jetClientSupplier) {
        this.options = JetRunner.validate((JetPipelineOptions)options.as(JetPipelineOptions.class));
        this.jetClientSupplier = jetClientSupplier;
        this.translatorProvider = JetTransformTranslators::getTranslator;
    }

    public PipelineResult run(Pipeline pipeline) {
        try {
            this.normalize(pipeline);
            DAG dag = this.translate(pipeline);
            return this.run(dag);
        }
        catch (UnsupportedOperationException uoe) {
            LOG.error("Failed running pipeline!", (Throwable)uoe);
            return new FailedRunningPipelineResults(uoe);
        }
    }

    void addExtraTranslators(Function<PTransform<?, ?>, JetTransformTranslator<?>> extraTranslatorProvider) {
        Function<PTransform<?, ?>, JetTransformTranslator<?>> initialTranslatorProvider = this.translatorProvider;
        this.translatorProvider = transform -> {
            JetTransformTranslator translator = (JetTransformTranslator)initialTranslatorProvider.apply((PTransform<?, ?>)transform);
            if (translator == null) {
                translator = (JetTransformTranslator)extraTranslatorProvider.apply((PTransform<?, ?>)transform);
            }
            return translator;
        };
    }

    private void normalize(Pipeline pipeline) {
        pipeline.replaceAll(JetRunner.getDefaultOverrides());
        UnconsumedReads.ensureAllReadsConsumed((Pipeline)pipeline);
    }

    private DAG translate(Pipeline pipeline) {
        JetGraphVisitor graphVisitor = new JetGraphVisitor(this.options, this.translatorProvider);
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)graphVisitor);
        return graphVisitor.getDAG();
    }

    private JetPipelineResult run(DAG dag) {
        this.startClusterIfNeeded(this.options);
        JetInstance jet = this.getJetInstance(this.options);
        Job job = jet.newJob(dag, this.getJobConfig(this.options));
        IMap metricsAccumulator = jet.getMap(JetMetricsContainer.getMetricsMapName(job.getId()));
        JetPipelineResult pipelineResult = new JetPipelineResult(job, (IMap<String, MetricUpdates>)metricsAccumulator);
        CompletionStage completionFuture = job.getFuture().whenCompleteAsync((r, f) -> {
            pipelineResult.freeze((Throwable)f);
            metricsAccumulator.destroy();
            jet.shutdown();
            this.stopClusterIfNeeded(this.options);
        });
        pipelineResult.setCompletionFuture((CompletableFuture<Void>)completionFuture);
        return pipelineResult;
    }

    private void startClusterIfNeeded(JetPipelineOptions options) {
        Integer noOfLocalMembers = options.getJetLocalMode();
        if (noOfLocalMembers > 0) {
            ArrayList<JetInstance> jetInstances = new ArrayList<JetInstance>();
            for (int i = 0; i < noOfLocalMembers; ++i) {
                jetInstances.add(Jet.newJetInstance());
            }
            LOG.info("Started " + jetInstances.size() + " Jet cluster members");
        }
    }

    private void stopClusterIfNeeded(JetPipelineOptions options) {
        Integer noOfLocalMembers = options.getJetLocalMode();
        if (noOfLocalMembers > 0) {
            Jet.shutdownAll();
            LOG.info("Stopped all Jet cluster members");
        }
    }

    private JobConfig getJobConfig(JetPipelineOptions options) {
        String codeJarPathname;
        boolean hasNoLocalMembers;
        JobConfig jobConfig = new JobConfig();
        String jobName = options.getJobName();
        if (jobName != null) {
            jobConfig.setName(jobName);
        }
        boolean bl = hasNoLocalMembers = options.getJetLocalMode() <= 0;
        if (hasNoLocalMembers && (codeJarPathname = options.getCodeJarPathname()) != null && !codeJarPathname.isEmpty()) {
            jobConfig.addJar(codeJarPathname);
        }
        return jobConfig;
    }

    private JetInstance getJetInstance(JetPipelineOptions options) {
        boolean hasNoLocalMembers;
        String clusterName = options.getClusterName();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setClusterName(clusterName);
        boolean bl = hasNoLocalMembers = options.getJetLocalMode() <= 0;
        if (hasNoLocalMembers) {
            clientConfig.getNetworkConfig().setAddresses(Arrays.asList(options.getJetServers().split(",")));
        }
        return this.jetClientSupplier.apply(clientConfig);
    }

    private static List<PTransformOverride> getDefaultOverrides() {
        return Arrays.asList(PTransformOverride.of((PTransformMatcher)PTransformMatchers.splittableParDo(), (PTransformOverrideFactory)new SplittableParDo.OverrideFactory()), PTransformOverride.of((PTransformMatcher)PTransformMatchers.splittableProcessKeyedBounded(), (PTransformOverrideFactory)new SplittableParDoNaiveBounded.OverrideFactory()), PTransformOverride.of((PTransformMatcher)PTransformMatchers.splittableProcessKeyedUnbounded(), (PTransformOverrideFactory)new SplittableParDoViaKeyedWorkItems.OverrideFactory()));
    }

    private static JetPipelineOptions validate(JetPipelineOptions options) {
        if (options.getClusterName() == null) {
            throw new IllegalArgumentException("Jet cluster name not set in options");
        }
        Integer localParallelism = options.getJetDefaultParallelism();
        if (localParallelism == null) {
            throw new IllegalArgumentException("Jet node local parallelism must be specified");
        }
        if (localParallelism != -1 && localParallelism < 1) {
            throw new IllegalArgumentException("Jet node local parallelism must be >1 or -1");
        }
        return options;
    }
}

