package org.apache.beam.runners.apex.translation;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.repackaged.com.google.common.collect.Lists;
import org.apache.beam.runners.apex.repackaged.com.google.common.collect.Sets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.class */
public class GroupByKeyTranslatorTest {

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest$EmbeddedCollector.class */
    private static class EmbeddedCollector extends DoFn<Object, Void> {
        private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet());

        private EmbeddedCollector() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Object, Void>.ProcessContext processContext) throws Exception {
            RESULTS.add(processContext.element());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest$KeyedByTimestamp.class */
    private static class KeyedByTimestamp<T> extends DoFn<T, KV<Instant, T>> {
        private KeyedByTimestamp() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, KV<Instant, T>>.ProcessContext processContext) throws Exception {
            processContext.output(KV.of(processContext.timestamp(), processContext.element()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest$TestSource.class */
    public static class TestSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
        private final List<KV<String, Instant>> data;
        private final Instant watermark;

        /* loaded from: input_file:org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest$TestSource$TestReader.class */
        private static class TestReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
            private static final long serialVersionUID = 7526472295622776147L;
            private final List<KV<String, Instant>> data;
            private final TestSource source;
            private Iterator<KV<String, Instant>> iterator;
            private String currentRecord;
            private Instant currentTimestamp;
            private Instant watermark;
            private boolean collected;

            public TestReader(List<KV<String, Instant>> list, Instant instant, TestSource testSource) {
                this.data = list;
                this.source = testSource;
                this.watermark = instant;
            }

            public boolean start() throws IOException {
                this.iterator = this.data.iterator();
                return advance();
            }

            public boolean advance() throws IOException {
                if (!this.iterator.hasNext()) {
                    return false;
                }
                KV<String, Instant> next = this.iterator.next();
                this.collected = false;
                this.currentRecord = (String) next.getKey();
                this.currentTimestamp = (Instant) next.getValue();
                return true;
            }

            public byte[] getCurrentRecordId() throws NoSuchElementException {
                return new byte[0];
            }

            /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
            public String m7getCurrent() throws NoSuchElementException {
                this.collected = true;
                return this.currentRecord;
            }

            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return this.currentTimestamp;
            }

            public void close() throws IOException {
            }

            public Instant getWatermark() {
                return (this.iterator.hasNext() || !this.collected) ? new Instant(0L) : this.watermark;
            }

            public UnboundedSource.CheckpointMark getCheckpointMark() {
                return null;
            }

            /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
            public UnboundedSource<String, ?> m6getCurrentSource() {
                return this.source;
            }
        }

        public TestSource(List<KV<String, Instant>> list, Instant instant) {
            this.data = list;
            this.watermark = instant;
        }

        public List<? extends UnboundedSource<String, UnboundedSource.CheckpointMark>> split(int i, PipelineOptions pipelineOptions) throws Exception {
            return Collections.singletonList(this);
        }

        public UnboundedSource.UnboundedReader<String> createReader(PipelineOptions pipelineOptions, @Nullable UnboundedSource.CheckpointMark checkpointMark) {
            return new TestReader(this.data, this.watermark, this);
        }

        @Nullable
        public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
            return null;
        }

        public void validate() {
        }

        public Coder<String> getDefaultOutputCoder() {
            return StringUtf8Coder.of();
        }
    }

    @Test
    public void test() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.as(ApexPipelineOptions.class);
        as.setApplicationName("GroupByKey");
        as.setRunner(ApexRunner.class);
        Pipeline create = Pipeline.create(as);
        ArrayList newArrayList = Lists.newArrayList(new KV[]{KV.of("foo", new Instant(1000L)), KV.of("foo", new Instant(1000L)), KV.of("foo", new Instant(2000L)), KV.of("bar", new Instant(1000L)), KV.of("bar", new Instant(2000L)), KV.of("bar", new Instant(2000L))});
        ArrayList newArrayList2 = Lists.newArrayList(new KV[]{KV.of(new Instant(1000L), KV.of("foo", 2L)), KV.of(new Instant(1000L), KV.of("bar", 1L)), KV.of(new Instant(2000L), KV.of("foo", 1L)), KV.of(new Instant(2000L), KV.of("bar", 2L))});
        create.apply(Read.from(new TestSource(newArrayList, new Instant(5000L)))).apply(Window.into(FixedWindows.of(Duration.standardSeconds(1L))).withTimestampCombiner(TimestampCombiner.LATEST)).apply(Count.perElement()).apply(ParDo.of(new KeyedByTimestamp())).apply(ParDo.of(new EmbeddedCollector()));
        create.run().getApexDAG();
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (System.currentTimeMillis() < currentTimeMillis && !EmbeddedCollector.RESULTS.containsAll(newArrayList2)) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals(Sets.newHashSet(newArrayList2), EmbeddedCollector.RESULTS);
    }
}
