/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.gearpump.translators.functions;

import io.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;

public class DoFnFunction<InputT, OutputT>
extends FlatMapFunction<List<TranslatorUtils.RawUnionValue>, TranslatorUtils.RawUnionValue> {
    private static final long serialVersionUID = -5701440128544343353L;
    private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
    private final DoFn<InputT, OutputT> doFn;
    private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
    private transient PushbackSideInputDoFnRunner<InputT, OutputT> doFnRunner;
    private transient SideInputHandler sideInputReader;
    private transient List<WindowedValue<InputT>> pushedBackValues;
    private final Collection<PCollectionView<?>> sideInputs;
    private final Map<String, PCollectionView<?>> tagsToSideInputs;
    private final TupleTag<OutputT> mainOutput;
    private final List<TupleTag<?>> sideOutputs;
    private final DoFnOutputManager outputManager;

    public DoFnFunction(GearpumpPipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Collection<PCollectionView<?>> sideInputs, Map<String, PCollectionView<?>> sideInputTagMapping, TupleTag<OutputT> mainOutput, Map<TupleTag<?>, Coder<?>> outputCoders, List<TupleTag<?>> sideOutputs, DoFnSchemaInformation doFnSchemaInformation) {
        this.doFn = doFn;
        this.outputManager = new DoFnOutputManager();
        this.doFnRunnerFactory = new DoFnRunnerFactory<InputT, OutputT>(pipelineOptions, doFn, sideInputs, this.outputManager, mainOutput, sideOutputs, new NoOpStepContext(), outputCoders, windowingStrategy, doFnSchemaInformation);
        this.sideInputs = sideInputs;
        this.tagsToSideInputs = sideInputTagMapping;
        this.mainOutput = mainOutput;
        this.sideOutputs = sideOutputs;
    }

    public void setup() {
        this.sideInputReader = new SideInputHandler(this.sideInputs, (StateInternals)InMemoryStateInternals.forKey(null));
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        if (this.doFnInvoker != null) {
            this.doFnInvoker.invokeSetup();
        }
        this.doFnRunner = this.doFnRunnerFactory.createRunner((ReadyCheckingSideInputReader)this.sideInputReader);
        this.pushedBackValues = new ArrayList<WindowedValue<InputT>>();
        this.outputManager.setup(this.mainOutput, this.sideOutputs);
    }

    public void teardown() {
        if (this.doFnInvoker != null) {
            this.doFnInvoker.invokeTeardown();
        }
    }

    public Iterator<TranslatorUtils.RawUnionValue> flatMap(List<TranslatorUtils.RawUnionValue> inputs) {
        this.outputManager.clear();
        this.doFnRunner.startBundle();
        for (TranslatorUtils.RawUnionValue rawUnionValue : inputs) {
            String tag = rawUnionValue.getUnionTag();
            if ("0".equals(tag)) {
                this.pushedBackValues.add((WindowedValue)rawUnionValue.getValue());
                continue;
            }
            PCollectionView<?> pCollectionView = this.tagsToSideInputs.get(rawUnionValue.getUnionTag());
            WindowedValue sideInputValue = (WindowedValue)rawUnionValue.getValue();
            this.sideInputReader.addSideInputValue(pCollectionView, sideInputValue);
        }
        for (PCollectionView pCollectionView : this.sideInputs) {
            for (WindowedValue windowedValue : this.pushedBackValues) {
                for (BoundedWindow win : windowedValue.getWindows()) {
                    BoundedWindow sideInputWindow = pCollectionView.getWindowMappingFn().getSideInputWindow(win);
                    if (this.sideInputReader.isReady(pCollectionView, sideInputWindow)) continue;
                    WindowedValue emptyValue = WindowedValue.of(new ArrayList(), (Instant)windowedValue.getTimestamp(), (BoundedWindow)sideInputWindow, (PaneInfo)windowedValue.getPane());
                    this.sideInputReader.addSideInputValue(pCollectionView, emptyValue);
                }
            }
        }
        ArrayList nextPushedBackValues = new ArrayList();
        for (WindowedValue<InputT> value : this.pushedBackValues) {
            Iterable iterable = this.doFnRunner.processElementInReadyWindows(value);
            Iterables.addAll(nextPushedBackValues, (Iterable)iterable);
        }
        this.pushedBackValues.clear();
        Iterables.addAll(this.pushedBackValues, nextPushedBackValues);
        this.doFnRunner.finishBundle();
        return this.outputManager.getOutputs();
    }

    private static class DoFnOutputManager
    implements DoFnRunners.OutputManager,
    Serializable {
        private static final long serialVersionUID = 4967375172737408160L;
        private transient List<TranslatorUtils.RawUnionValue> outputs;
        private transient Set<TupleTag<?>> outputTags;

        private DoFnOutputManager() {
        }

        public <T> void output(TupleTag<T> outputTag, WindowedValue<T> output) {
            if (this.outputTags.contains(outputTag)) {
                this.outputs.add(new TranslatorUtils.RawUnionValue(outputTag.getId(), output));
            }
        }

        void setup(TupleTag<?> mainOutput, List<TupleTag<?>> sideOutputs) {
            this.outputs = new ArrayList<TranslatorUtils.RawUnionValue>();
            this.outputTags = new HashSet();
            this.outputTags.add(mainOutput);
            this.outputTags.addAll(sideOutputs);
        }

        void clear() {
            this.outputs.clear();
        }

        Iterator<TranslatorUtils.RawUnionValue> getOutputs() {
            return this.outputs.iterator();
        }
    }
}

