package org.apache.beam.runners.apex.translation;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Sink;
import com.datatorrent.lib.util.KryoCloneUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.TestApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/apex/translation/ParDoTranslatorTest.class */
public class ParDoTranslatorTest {
    private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslatorTest.class);
    private static final long SLEEP_MILLIS = 500;
    private static final long TIMEOUT_MILLIS = 30000;

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/ParDoTranslatorTest$Add.class */
    private static class Add extends DoFn<Integer, Integer> {
        private static final long serialVersionUID = 1;
        private Integer number;
        private PCollectionView<Integer> sideInputView;

        private Add(Integer num) {
            this.number = num;
        }

        private Add(PCollectionView<Integer> pCollectionView) {
            this.sideInputView = pCollectionView;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
            if (this.sideInputView != null) {
                this.number = (Integer) processContext.sideInput(this.sideInputView);
            }
            processContext.output(Integer.valueOf(((Integer) processContext.element()).intValue() + this.number.intValue()));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/ParDoTranslatorTest$EmbeddedCollector.class */
    private static class EmbeddedCollector extends DoFn<Object, Void> {
        private static final long serialVersionUID = 1;
        private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet());

        public EmbeddedCollector() {
            RESULTS.clear();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Object, Void>.ProcessContext processContext) throws Exception {
            RESULTS.add(processContext.element());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/ParDoTranslatorTest$TestMultiOutputWithSideInputsFn.class */
    private static class TestMultiOutputWithSideInputsFn extends DoFn<Integer, String> {
        private static final long serialVersionUID = 1;
        final List<PCollectionView<Integer>> sideInputViews = new ArrayList();
        final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList();

        public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> list, List<TupleTag<String>> list2) {
            this.sideInputViews.addAll(list);
            this.additionalOutputTupleTags.addAll(list2);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, String>.ProcessContext processContext) throws Exception {
            outputToAllWithSideInputs(processContext, "processing: " + processContext.element());
        }

        private void outputToAllWithSideInputs(DoFn<Integer, String>.ProcessContext processContext, String str) {
            if (!this.sideInputViews.isEmpty()) {
                ArrayList arrayList = new ArrayList();
                Iterator<PCollectionView<Integer>> it = this.sideInputViews.iterator();
                while (it.hasNext()) {
                    arrayList.add((Integer) processContext.sideInput(it.next()));
                }
                str = str + ": " + arrayList;
            }
            processContext.output(str);
            for (TupleTag<String> tupleTag : this.additionalOutputTupleTags) {
                processContext.output(tupleTag, tupleTag.getId() + ": " + str);
            }
        }
    }

    @Test
    public void test() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
        as.setApplicationName("ParDoBound");
        as.setRunner(ApexRunner.class);
        Pipeline create = Pipeline.create(as);
        ArrayList newArrayList = Lists.newArrayList(new Integer[]{1, 2, 3, 4, 5});
        ArrayList newArrayList2 = Lists.newArrayList(new Integer[]{6, 7, 8, 9, 10});
        create.apply(Create.of(newArrayList).withCoder(SerializableCoder.of(Integer.class))).apply(ParDo.of(new Add((Integer) 5))).apply(ParDo.of(new EmbeddedCollector()));
        DAG apexDAG = create.run().getApexDAG();
        DAG.OperatorMeta operatorMeta = apexDAG.getOperatorMeta("Create.Values");
        Assert.assertNotNull(operatorMeta);
        Assert.assertEquals(operatorMeta.getOperator().getClass(), ApexReadUnboundedInputOperator.class);
        DAG.OperatorMeta operatorMeta2 = apexDAG.getOperatorMeta("ParDo(Add)/ParMultiDo(Add)");
        Assert.assertNotNull(operatorMeta2);
        Assert.assertEquals(operatorMeta2.getOperator().getClass(), ApexParDoOperator.class);
        long currentTimeMillis = System.currentTimeMillis() + TIMEOUT_MILLIS;
        while (System.currentTimeMillis() < currentTimeMillis && !EmbeddedCollector.RESULTS.containsAll(newArrayList2)) {
            LOG.info("Waiting for expected results.");
            Thread.sleep(SLEEP_MILLIS);
        }
        Assert.assertEquals(Sets.newHashSet(newArrayList2), EmbeddedCollector.RESULTS);
    }

    private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
        try {
            pipeline.run();
            Assert.fail("assertion should have failed");
            throw new RuntimeException("unreachable");
        } catch (AssertionError e) {
            return e;
        }
    }

    @Test
    @Ignore("https://issues.apache.org/jira/browse/BEAM-3272")
    public void testAssertionFailure() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
        as.setRunner(TestApexRunner.class);
        Pipeline create = Pipeline.create(as);
        PAssert.that(create.apply(Create.of(1, new Integer[]{2, 3, 4}))).containsInAnyOrder(new Integer[]{2, 1, 4, 3, 7});
        Throwable runExpectingAssertionFailure = runExpectingAssertionFailure(create);
        Pattern compile = Pattern.compile("Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");
        Assert.assertTrue("Expected error message from PAssert with substring matching " + compile + " but the message was \"" + runExpectingAssertionFailure.getMessage() + "\"", compile.matcher(runExpectingAssertionFailure.getMessage()).find());
    }

    @Test
    public void testContainsInAnyOrder() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
        as.setRunner(TestApexRunner.class);
        Pipeline create = Pipeline.create(as);
        PAssert.that(create.apply(Create.of(1, new Integer[]{2, 3, 4}))).containsInAnyOrder(new Integer[]{2, 1, 4, 3});
        create.run();
    }

    @Test
    public void testSerialization() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
        as.setRunner(TestApexRunner.class);
        PCollectionView apply = Pipeline.create(as).apply(Create.of(1, new Integer[0])).apply(Sum.integersGlobally().asSingletonView());
        ApexParDoOperator apexParDoOperator = new ApexParDoOperator(as, new Add(apply), new TupleTag(), TupleTagList.empty().getAll(), WindowingStrategy.globalDefault(), Collections.singletonList(apply), VarIntCoder.of(), Collections.emptyMap(), DoFnSchemaInformation.create(), new ApexStateInternals.ApexStateBackend());
        apexParDoOperator.setup((Context.OperatorContext) null);
        apexParDoOperator.beginWindow(0L);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(1);
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow(PCollectionViewTesting.materializeValuesFor(View.asSingleton(), new Object[]{22}));
        apexParDoOperator.input.process(ApexStreamTuple.DataTuple.of(valueInGlobalWindow));
        final ArrayList newArrayList = Lists.newArrayList();
        Sink<Object> sink = new Sink<Object>() { // from class: org.apache.beam.runners.apex.translation.ParDoTranslatorTest.1
            public void put(Object obj) {
                newArrayList.add(obj);
            }

            public int getCount(boolean z) {
                return 0;
            }
        };
        ApexParDoOperator apexParDoOperator2 = (ApexParDoOperator) KryoCloneUtils.cloneObject(apexParDoOperator);
        Assert.assertNotNull("Serialization", apexParDoOperator2);
        apexParDoOperator2.output.setSink(sink);
        apexParDoOperator2.setup((Context.OperatorContext) null);
        apexParDoOperator2.beginWindow(1L);
        WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow(2);
        apexParDoOperator2.sideInput1.process(ApexStreamTuple.DataTuple.of(valueInGlobalWindow2));
        Assert.assertEquals("number outputs", 1L, newArrayList.size());
        Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23), ((ApexStreamTuple.DataTuple) newArrayList.get(0)).getValue());
        newArrayList.clear();
        ApexParDoOperator apexParDoOperator3 = (ApexParDoOperator) KryoCloneUtils.cloneObject(apexParDoOperator2);
        Assert.assertNotNull("Serialization", apexParDoOperator3);
        apexParDoOperator3.output.setSink(sink);
        apexParDoOperator3.setup((Context.OperatorContext) null);
        apexParDoOperator3.beginWindow(2L);
        apexParDoOperator3.input.process(ApexStreamTuple.DataTuple.of(valueInGlobalWindow3));
        Assert.assertEquals("number outputs", 1L, newArrayList.size());
        Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24), ((ApexStreamTuple.DataTuple) newArrayList.get(0)).getValue());
    }

    @Test
    public void testMultiOutputParDoWithSideInputs() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
        as.setRunner(ApexRunner.class);
        Pipeline create = Pipeline.create(as);
        List asList = Arrays.asList(3, -42, 666);
        TupleTag tupleTag = new TupleTag("main");
        TupleTag tupleTag2 = new TupleTag("output");
        PCollectionView apply = create.apply("CreateSideInput1", Create.of(11, new Integer[0])).apply("ViewSideInput1", View.asSingleton());
        PCollectionView apply2 = create.apply("CreateSideInputUnread", Create.of(-3333, new Integer[0])).apply("ViewSideInputUnread", View.asSingleton());
        PCollectionView apply3 = create.apply("CreateSideInput2", Create.of(222, new Integer[0])).apply("ViewSideInput2", View.asSingleton());
        PCollectionTuple apply4 = create.apply(Create.of(asList)).apply(ParDo.of(new TestMultiOutputWithSideInputsFn(Arrays.asList(apply, apply3), Arrays.asList(new TupleTag[0]))).withSideInputs(new PCollectionView[]{apply}).withSideInputs(new PCollectionView[]{apply2}).withSideInputs(new PCollectionView[]{apply3}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        apply4.get(tupleTag).apply(ParDo.of(new EmbeddedCollector()));
        apply4.get(tupleTag2).setCoder(VoidCoder.of());
        ApexRunnerResult run = create.run();
        HashSet newHashSet = Sets.newHashSet(new String[]{"processing: 3: [11, 222]", "processing: -42: [11, 222]", "processing: 666: [11, 222]"});
        long currentTimeMillis = System.currentTimeMillis() + TIMEOUT_MILLIS;
        while (System.currentTimeMillis() < currentTimeMillis && !EmbeddedCollector.RESULTS.containsAll(newHashSet)) {
            LOG.info("Waiting for expected results.");
            Thread.sleep(SLEEP_MILLIS);
        }
        run.cancel();
        Assert.assertEquals(Sets.newHashSet(newHashSet), EmbeddedCollector.RESULTS);
    }
}
