package org.apache.beam.runners.apex.translation;

import com.datatorrent.api.DAG;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
import org.apache.beam.runners.apex.translation.utils.CollectionSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ContiguousSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.DiscreteDomain;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Range;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.class */
public class ReadUnboundTranslatorTest {
    private static final Logger LOG = LoggerFactory.getLogger(ReadUnboundTranslatorTest.class);

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest$EmbeddedCollector.class */
    private static class EmbeddedCollector extends DoFn<Object, Void> {
        private static final long serialVersionUID = 1;
        private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet());

        private EmbeddedCollector() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Object, Void>.ProcessContext processContext) throws Exception {
            RESULTS.add(processContext.element());
        }
    }

    @Test
    public void test() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
        EmbeddedCollector.RESULTS.clear();
        as.setApplicationName("ReadUnbound");
        as.setRunner(ApexRunner.class);
        Pipeline create = Pipeline.create(as);
        ArrayList newArrayList = Lists.newArrayList(new String[]{"1", "2", "3", "4", "5"});
        create.apply(Read.from(new CollectionSource(newArrayList, StringUtf8Coder.of()))).apply(ParDo.of(new EmbeddedCollector()));
        DAG.OperatorMeta operatorMeta = create.run().getApexDAG().getOperatorMeta("Read(CollectionSource)");
        Assert.assertNotNull(operatorMeta);
        Assert.assertEquals(operatorMeta.getOperator().getClass(), ApexReadUnboundedInputOperator.class);
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (System.currentTimeMillis() < currentTimeMillis && !EmbeddedCollector.RESULTS.containsAll(newArrayList)) {
            LOG.info("Waiting for expected results.");
            Thread.sleep(1000L);
        }
        Assert.assertEquals(Sets.newHashSet(newArrayList), EmbeddedCollector.RESULTS);
    }

    @Test
    public void testReadBounded() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
        EmbeddedCollector.RESULTS.clear();
        as.setApplicationName("ReadBounded");
        as.setRunner(ApexRunner.class);
        Pipeline create = Pipeline.create(as);
        Collection<?> create2 = ContiguousSet.create(Range.closedOpen(0L, 10L), DiscreteDomain.longs());
        create.apply(GenerateSequence.from(0L).to(10L)).apply(ParDo.of(new EmbeddedCollector()));
        DAG.OperatorMeta operatorMeta = create.run().getApexDAG().getOperatorMeta("GenerateSequence/Read(BoundedCountingSource)");
        Assert.assertNotNull(operatorMeta);
        Assert.assertEquals(operatorMeta.getOperator().getClass(), ApexReadUnboundedInputOperator.class);
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (System.currentTimeMillis() < currentTimeMillis && !EmbeddedCollector.RESULTS.containsAll(create2)) {
            LOG.info("Waiting for expected results.");
            Thread.sleep(1000L);
        }
        Assert.assertEquals(Sets.newHashSet(create2), EmbeddedCollector.RESULTS);
    }
}
