package org.apache.beam.runners.apex.translation;

import com.datatorrent.api.DAG;
import com.datatorrent.stram.engine.PortContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.TestApexRunner;
import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/apex/translation/SideInputTranslationTest.class */
public class SideInputTranslationTest implements Serializable {
    private static final AtomicReference<Boolean> SIDE_INPUT_ACCESSED = new AtomicReference<>();

    @Test
    public void testMapAsEntrySetSideInput() {
        SIDE_INPUT_ACCESSED.set(false);
        ApexPipelineOptions as = PipelineOptionsFactory.as(ApexPipelineOptions.class);
        as.setApplicationName("SideInputTranslationTest");
        as.setRunner(TestApexRunner.class);
        Pipeline create = Pipeline.create(as);
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(KV.of("a", 1), new KV[]{KV.of("b", 3)})).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.of(2, new Integer[0])).apply("OutputSideInputs", ParDo.of(new DoFn<Integer, KV<String, Integer>>() { // from class: org.apache.beam.runners.apex.translation.SideInputTranslationTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, KV<String, Integer>>.ProcessContext processContext) {
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).size());
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).entrySet().size());
                for (Map.Entry entry : ((Map) processContext.sideInput(apply)).entrySet()) {
                    processContext.output(KV.of((String) entry.getKey(), (Integer) entry.getValue()));
                }
                SideInputTranslationTest.SIDE_INPUT_ACCESSED.set(true);
            }
        }).withSideInputs(new PCollectionView[]{apply}))).containsInAnyOrder(new KV[]{KV.of("a", 1), KV.of("b", 3)});
        create.run();
        Assert.assertTrue(SIDE_INPUT_ACCESSED.get().booleanValue());
    }

    @Test
    public void testListSideInputTranslation() throws Exception {
        Assert.assertEquals(ListCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of())), getTranslatedSideInputCoder(ImmutableList.of(11, 13, 17, 23), View.asList()));
    }

    @Test
    public void testMapSideInputTranslation() throws Exception {
        Assert.assertEquals(ListCoder.of(KvCoder.of(VoidCoder.of(), KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))), getTranslatedSideInputCoder(ImmutableList.of(KV.of("a", 1), KV.of("b", 3)), View.asMap()));
    }

    @Test
    public void testMultimapSideInputTranslation() throws Exception {
        Assert.assertEquals(ListCoder.of(KvCoder.of(VoidCoder.of(), KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))), getTranslatedSideInputCoder(ImmutableList.of(KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)), View.asMultimap()));
    }

    private <T> Coder<?> getTranslatedSideInputCoder(List<T> list, PTransform<PCollection<T>, ? extends PCollectionView<?>> pTransform) throws Exception {
        Pipeline create = Pipeline.create();
        create.apply(Create.of(1, new Integer[0])).apply("ParDo", ParDo.of(new DoFn<Integer, KV<String, Integer>>() { // from class: org.apache.beam.runners.apex.translation.SideInputTranslationTest.2
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, KV<String, Integer>>.ProcessContext processContext) {
            }
        }).withSideInputs(new PCollectionView[]{create.apply(Create.of(list)).apply(pTransform)}));
        DAG.OperatorMeta operatorMeta = TestApexRunner.translate(create, PipelineOptionsFactory.create().as(ApexPipelineOptions.class)).getOperatorMeta("ParDo/ParMultiDo(Anonymous)");
        Assert.assertNotNull(operatorMeta);
        Assert.assertEquals(2L, operatorMeta.getInputStreams().size());
        LogicalPlan.InputPortMeta inputPortMeta = null;
        for (LogicalPlan.InputPortMeta inputPortMeta2 : operatorMeta.getInputStreams().keySet()) {
            if ("sideInput1".equals(inputPortMeta2.getPortName())) {
                inputPortMeta = inputPortMeta2;
            }
        }
        Assert.assertNotNull("could not find stream for: sideInput1", inputPortMeta);
        return ((CoderAdapterStreamCodec) inputPortMeta.getAttributes().get(PortContext.STREAM_CODEC)).getCoder().getValueCoder().getValueCoder();
    }
}
