/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.jet.processors;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.jet.DAGBuilder;
import org.apache.beam.runners.jet.JetPipelineOptions;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.runners.jet.metrics.JetMetricsContainer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

abstract class AbstractParDoP<InputT, OutputT>
implements Processor {
    private final SerializablePipelineOptions pipelineOptions;
    private final DoFn<InputT, OutputT> doFn;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final Map<TupleTag<?>, int[]> outputCollToOrdinals;
    private final TupleTag<OutputT> mainOutputTag;
    private final Coder<InputT> inputCoder;
    private final Map<PCollectionView<?>, Coder<?>> sideInputCoders;
    private final Map<TupleTag<?>, Coder<?>> outputCoders;
    private final Coder<InputT> inputValueCoder;
    private final Map<TupleTag<?>, Coder<?>> outputValueCoders;
    private final Map<Integer, PCollectionView<?>> ordinalToSideInput;
    private final String ownerId;
    private final String stepId;
    private final boolean cooperative;
    private final long metricsFlushPeriod = TimeUnit.SECONDS.toMillis(1L) + ThreadLocalRandom.current().nextLong(500L);
    DoFnRunner<InputT, OutputT> doFnRunner;
    JetOutputManager outputManager;
    private DoFnInvoker<InputT, OutputT> doFnInvoker;
    private SideInputHandler sideInputHandler;
    private JetMetricsContainer metricsContainer;
    private SimpleInbox bufferedItems;
    private Set<Integer> completedSideInputs = new HashSet<Integer>();
    private SideInputReader sideInputReader;
    private Outbox outbox;
    private long lastMetricsFlushTime = System.currentTimeMillis();
    private Map<String, PCollectionView<?>> sideInputMapping;

    AbstractParDoP(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map<TupleTag<?>, int[]> outputCollToOrdinals, SerializablePipelineOptions pipelineOptions, TupleTag<OutputT> mainOutputTag, Coder<InputT> inputCoder, Map<PCollectionView<?>, Coder<?>> sideInputCoders, Map<TupleTag<?>, Coder<?>> outputCoders, Coder<InputT> inputValueCoder, Map<TupleTag<?>, Coder<?>> outputValueCoders, Map<Integer, PCollectionView<?>> ordinalToSideInput, Map<String, PCollectionView<?>> sideInputMapping, String ownerId, String stepId) {
        this.pipelineOptions = pipelineOptions;
        this.doFn = Utils.serde(doFn);
        this.windowingStrategy = windowingStrategy;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.outputCollToOrdinals = outputCollToOrdinals;
        this.mainOutputTag = mainOutputTag;
        this.inputCoder = inputCoder;
        this.sideInputCoders = sideInputCoders.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> Utils.deriveIterableValueCoder((WindowedValue.FullWindowedValueCoder)e.getValue())));
        this.outputCoders = outputCoders;
        this.inputValueCoder = inputValueCoder;
        this.outputValueCoders = outputValueCoders;
        this.ordinalToSideInput = ordinalToSideInput;
        this.sideInputMapping = sideInputMapping;
        this.ownerId = ownerId;
        this.stepId = stepId;
        this.cooperative = AbstractParDoP.isCooperativenessAllowed(pipelineOptions) != false && this.hasOutput();
    }

    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.outbox = outbox;
        this.metricsContainer = new JetMetricsContainer(this.stepId, this.ownerId, context);
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        this.doFnInvoker.invokeSetup();
        if (this.ordinalToSideInput.isEmpty()) {
            this.sideInputReader = NullSideInputReader.of(Collections.emptyList());
        } else {
            this.bufferedItems = new SimpleInbox();
            this.sideInputHandler = new SideInputHandler(this.ordinalToSideInput.values(), (StateInternals)InMemoryStateInternals.forKey(null));
            this.sideInputReader = this.sideInputHandler;
        }
        this.outputManager = new JetOutputManager(outbox, this.outputCoders, this.outputCollToOrdinals);
        this.doFnRunner = this.getDoFnRunner(this.pipelineOptions.get(), this.doFn, this.sideInputReader, this.outputManager, this.mainOutputTag, Lists.newArrayList(this.outputCollToOrdinals.keySet()), this.inputValueCoder, this.outputValueCoders, this.windowingStrategy, this.doFnSchemaInformation, this.sideInputMapping);
    }

    protected abstract DoFnRunner<InputT, OutputT> getDoFnRunner(PipelineOptions var1, DoFn<InputT, OutputT> var2, SideInputReader var3, JetOutputManager var4, TupleTag<OutputT> var5, List<TupleTag<?>> var6, Coder<InputT> var7, Map<TupleTag<?>, Coder<?>> var8, WindowingStrategy<?, ?> var9, DoFnSchemaInformation var10, Map<String, PCollectionView<?>> var11);

    public boolean isCooperative() {
        return this.cooperative;
    }

    public void close() {
        this.doFnInvoker.invokeTeardown();
    }

    public void process(int ordinal, @Nonnull Inbox inbox) {
        MetricsEnvironment.setCurrentContainer((MetricsContainer)this.metricsContainer);
        if (!this.outputManager.tryFlush()) {
            return;
        }
        PCollectionView<?> sideInputView = this.ordinalToSideInput.get(ordinal);
        if (sideInputView != null) {
            this.processSideInput(sideInputView, inbox);
        } else if (this.bufferedItems != null) {
            this.processBufferedRegularItems(inbox);
        } else {
            this.processNonBufferedRegularItems(inbox);
        }
        MetricsEnvironment.setCurrentContainer(null);
    }

    private void processSideInput(PCollectionView<?> sideInputView, Inbox inbox) {
        byte[] value;
        while ((value = (byte[])inbox.poll()) != null) {
            Coder<?> sideInputCoder = this.sideInputCoders.get(sideInputView);
            WindowedValue windowedValue = Utils.decodeWindowedValue(value, sideInputCoder);
            this.sideInputHandler.addSideInputValue(sideInputView, windowedValue);
        }
    }

    private void processNonBufferedRegularItems(Inbox inbox) {
        byte[] value;
        this.startRunnerBundle(this.doFnRunner);
        while ((value = (byte[])inbox.poll()) != null) {
            WindowedValue windowedValue = Utils.decodeWindowedValue(value, this.inputCoder);
            this.processElementWithRunner(this.doFnRunner, windowedValue);
            if (this.outputManager.tryFlush()) continue;
            break;
        }
        this.finishRunnerBundle(this.doFnRunner);
    }

    protected void startRunnerBundle(DoFnRunner<InputT, OutputT> runner) {
        runner.startBundle();
    }

    protected void processElementWithRunner(DoFnRunner<InputT, OutputT> runner, WindowedValue<InputT> windowedValue) {
        runner.processElement(windowedValue);
    }

    protected void finishRunnerBundle(DoFnRunner<InputT, OutputT> runner) {
        runner.finishBundle();
    }

    private void processBufferedRegularItems(Inbox inbox) {
        byte[] value;
        while ((value = (byte[])inbox.poll()) != null) {
            this.bufferedItems.add(value);
        }
    }

    public boolean tryProcess() {
        boolean successful = this.outputManager.tryFlush();
        if (successful && System.currentTimeMillis() > this.lastMetricsFlushTime + this.metricsFlushPeriod) {
            this.metricsContainer.flush(true);
            this.lastMetricsFlushTime = System.currentTimeMillis();
        }
        return successful;
    }

    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return this.outbox.offer((Object)watermark);
    }

    public boolean completeEdge(int ordinal) {
        if (this.ordinalToSideInput.get(ordinal) == null) {
            return true;
        }
        this.completedSideInputs.add(ordinal);
        if (this.completedSideInputs.size() != this.ordinalToSideInput.size()) {
            return true;
        }
        this.processNonBufferedRegularItems(this.bufferedItems);
        if (this.bufferedItems.isEmpty()) {
            this.bufferedItems = null;
            return true;
        }
        return false;
    }

    public boolean complete() {
        boolean successful = this.outputManager.tryFlush();
        if (successful) {
            this.metricsContainer.flush(false);
        }
        return successful;
    }

    private boolean hasOutput() {
        for (int[] value : this.outputCollToOrdinals.values()) {
            if (value.length <= 0) continue;
            return true;
        }
        return false;
    }

    private static Boolean isCooperativenessAllowed(SerializablePipelineOptions serializablePipelineOptions) {
        PipelineOptions pipelineOptions = serializablePipelineOptions.get();
        JetPipelineOptions jetPipelineOptions = (JetPipelineOptions)pipelineOptions.as(JetPipelineOptions.class);
        return jetPipelineOptions.getJetProcessorsCooperative();
    }

    private static class SimpleInbox
    implements Inbox {
        private Deque<Object> items = new ArrayDeque<Object>();

        private SimpleInbox() {
        }

        void add(Object item) {
            this.items.add(item);
        }

        public boolean isEmpty() {
            return this.items.isEmpty();
        }

        public Object peek() {
            return this.items.peek();
        }

        public Object poll() {
            return this.items.poll();
        }

        public void remove() {
            this.items.remove();
        }

        public int size() {
            return this.items.size();
        }
    }

    static abstract class AbstractSupplier<InputT, OutputT>
    implements SupplierEx<Processor>,
    DAGBuilder.WiringListener {
        protected final String ownerId;
        private final String stepId;
        private final SerializablePipelineOptions pipelineOptions;
        private final DoFn<InputT, OutputT> doFn;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final DoFnSchemaInformation doFnSchemaInformation;
        private final TupleTag<OutputT> mainOutputTag;
        private final Map<TupleTag<?>, List<Integer>> outputCollToOrdinals;
        private final Coder<InputT> inputCoder;
        private final Map<PCollectionView<?>, Coder<?>> sideInputCoders;
        private final Map<TupleTag<?>, Coder<?>> outputCoders;
        private final Coder<InputT> inputValueCoder;
        private final Map<TupleTag<?>, Coder<?>> outputValueCoders;
        private final Collection<PCollectionView<?>> sideInputs;
        private final Map<String, PCollectionView<?>> sideInputMapping;
        private final Map<Integer, PCollectionView<?>> ordinalToSideInput = new HashMap();

        AbstractSupplier(String stepId, String ownerId, DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, SerializablePipelineOptions pipelineOptions, TupleTag<OutputT> mainOutputTag, Set<TupleTag<OutputT>> allOutputTags, Coder<InputT> inputCoder, Map<PCollectionView<?>, Coder<?>> sideInputCoders, Map<TupleTag<?>, Coder<?>> outputCoders, Coder<InputT> inputValueCoder, Map<TupleTag<?>, Coder<?>> outputValueCoders, Collection<PCollectionView<?>> sideInputs, Map<String, PCollectionView<?>> sideInputMapping) {
            this.stepId = stepId;
            this.ownerId = ownerId;
            this.pipelineOptions = pipelineOptions;
            this.doFn = doFn;
            this.windowingStrategy = windowingStrategy;
            this.doFnSchemaInformation = doFnSchemaInformation;
            this.outputCollToOrdinals = allOutputTags.stream().collect(Collectors.toMap(Function.identity(), t -> new ArrayList()));
            this.mainOutputTag = mainOutputTag;
            this.inputCoder = inputCoder;
            this.sideInputCoders = sideInputCoders;
            this.outputCoders = outputCoders;
            this.inputValueCoder = inputValueCoder;
            this.outputValueCoders = outputValueCoders;
            this.sideInputs = sideInputs;
            this.sideInputMapping = sideInputMapping;
        }

        public Processor getEx() {
            if (this.ordinalToSideInput.size() != this.sideInputs.size()) {
                throw new RuntimeException("Oops");
            }
            return this.getEx(this.doFn, this.windowingStrategy, this.doFnSchemaInformation, this.outputCollToOrdinals.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((List)e.getValue()).stream().mapToInt(i -> i).toArray())), this.pipelineOptions, this.mainOutputTag, this.inputCoder, Collections.unmodifiableMap(this.sideInputCoders), Collections.unmodifiableMap(this.outputCoders), this.inputValueCoder, Collections.unmodifiableMap(this.outputValueCoders), Collections.unmodifiableMap(this.ordinalToSideInput), this.sideInputMapping, this.ownerId, this.stepId);
        }

        abstract Processor getEx(DoFn<InputT, OutputT> var1, WindowingStrategy<?, ?> var2, DoFnSchemaInformation var3, Map<TupleTag<?>, int[]> var4, SerializablePipelineOptions var5, TupleTag<OutputT> var6, Coder<InputT> var7, Map<PCollectionView<?>, Coder<?>> var8, Map<TupleTag<?>, Coder<?>> var9, Coder<InputT> var10, Map<TupleTag<?>, Coder<?>> var11, Map<Integer, PCollectionView<?>> var12, Map<String, PCollectionView<?>> var13, String var14, String var15);

        @Override
        public void isOutboundEdgeOfVertex(Edge edge, String edgeId, String pCollId, String vertexId) {
            if (this.ownerId.equals(vertexId)) {
                List<Integer> ordinals = this.outputCollToOrdinals.get(new TupleTag(pCollId));
                if (ordinals == null) {
                    throw new RuntimeException("Oops");
                }
                ordinals.add(edge.getSourceOrdinal());
            }
        }

        @Override
        public void isInboundEdgeOfVertex(Edge edge, String edgeId, String pCollId, String vertexId) {
            if (this.ownerId.equals(vertexId)) {
                for (PCollectionView<?> pCollectionView : this.sideInputs) {
                    if (!edgeId.equals(Utils.getTupleTagId(pCollectionView))) continue;
                    this.ordinalToSideInput.put(edge.getDestOrdinal(), pCollectionView);
                    break;
                }
            }
        }
    }

    static class JetOutputManager
    implements DoFnRunners.OutputManager {
        private final Outbox outbox;
        private final Map<TupleTag<?>, Coder<?>> outputCoders;
        private final Map<TupleTag<?>, int[]> outputCollToOrdinals;
        private final List<Object>[] outputBuckets;
        private int currentBucket;
        private int currentItem;

        JetOutputManager(Outbox outbox, Map<TupleTag<?>, Coder<?>> outputCoders, Map<TupleTag<?>, int[]> outputCollToOrdinals) {
            this.outbox = outbox;
            this.outputCoders = outputCoders;
            this.outputCollToOrdinals = outputCollToOrdinals;
            assert (!outputCollToOrdinals.isEmpty());
            int maxOrdinal = outputCollToOrdinals.values().stream().flatMapToInt(IntStream::of).max().orElse(-1);
            this.outputBuckets = new List[maxOrdinal + 1];
            Arrays.setAll(this.outputBuckets, i -> new ArrayList());
        }

        public <T> void output(TupleTag<T> tag, WindowedValue<T> outputValue) {
            assert (this.currentBucket == 0 && this.currentItem == 0) : "adding output while flushing";
            Coder<?> coder = this.outputCoders.get(tag);
            byte[] output = Utils.encode(outputValue, coder);
            for (int ordinal : this.outputCollToOrdinals.get(tag)) {
                this.outputBuckets[ordinal].add(output);
            }
        }

        @CheckReturnValue
        boolean tryFlush() {
            while (this.currentBucket < this.outputBuckets.length) {
                List<Object> bucket = this.outputBuckets[this.currentBucket];
                while (this.currentItem < bucket.size()) {
                    if (!this.outbox.offer(this.currentBucket, bucket.get(this.currentItem))) {
                        return false;
                    }
                    ++this.currentItem;
                }
                bucket.clear();
                this.currentItem = 0;
                ++this.currentBucket;
            }
            this.currentBucket = 0;
            int sum = 0;
            for (List<Object> outputBucket : this.outputBuckets) {
                sum += outputBucket.size();
            }
            return sum == 0;
        }
    }
}

