/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.ElementAndRestriction;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class SplittableParDoTest {
    private static PCollection<Integer> makeUnboundedCollection(Pipeline pipeline) {
        return ((PCollection)pipeline.apply("unbounded", (PTransform)Create.of((Object[])new Integer[]{1, 2, 3}))).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
    }

    private static PCollection<Integer> makeBoundedCollection(Pipeline pipeline) {
        return ((PCollection)pipeline.apply("bounded", (PTransform)Create.of((Object[])new Integer[]{1, 2, 3}))).setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
    }

    @Test
    public void testBoundednessForBoundedFn() {
        TestPipeline pipeline = TestPipeline.create();
        BoundedFakeFn boundedFn = new BoundedFakeFn();
        Assert.assertEquals((String)"Applying a bounded SDF to a bounded collection produces a bounded collection", (Object)PCollection.IsBounded.BOUNDED, (Object)((PCollection)SplittableParDoTest.makeBoundedCollection((Pipeline)pipeline).apply("bounded to bounded", (PTransform)new SplittableParDo((DoFn)boundedFn))).isBounded());
        Assert.assertEquals((String)"Applying a bounded SDF to an unbounded collection produces an unbounded collection", (Object)PCollection.IsBounded.UNBOUNDED, (Object)((PCollection)SplittableParDoTest.makeUnboundedCollection((Pipeline)pipeline).apply("bounded to unbounded", (PTransform)new SplittableParDo((DoFn)boundedFn))).isBounded());
    }

    @Test
    public void testBoundednessForUnboundedFn() {
        TestPipeline pipeline = TestPipeline.create();
        UnboundedFakeFn unboundedFn = new UnboundedFakeFn();
        Assert.assertEquals((String)"Applying an unbounded SDF to a bounded collection produces a bounded collection", (Object)PCollection.IsBounded.UNBOUNDED, (Object)((PCollection)SplittableParDoTest.makeBoundedCollection((Pipeline)pipeline).apply("unbounded to bounded", (PTransform)new SplittableParDo((DoFn)unboundedFn))).isBounded());
        Assert.assertEquals((String)"Applying an unbounded SDF to an unbounded collection produces an unbounded collection", (Object)PCollection.IsBounded.UNBOUNDED, (Object)((PCollection)SplittableParDoTest.makeUnboundedCollection((Pipeline)pipeline).apply("unbounded to unbounded", (PTransform)new SplittableParDo((DoFn)unboundedFn))).isBounded());
    }

    @Test
    public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception {
        ToStringFn fn = new ToStringFn();
        Instant base = Instant.now();
        ProcessFnTester tester = new ProcessFnTester(base, fn, (Coder<Integer>)BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
        IntervalWindow w1 = new IntervalWindow(base.minus((ReadableDuration)Duration.standardMinutes((long)1L)), base.plus((ReadableDuration)Duration.standardMinutes((long)1L)));
        IntervalWindow w2 = new IntervalWindow(base.minus((ReadableDuration)Duration.standardMinutes((long)2L)), base.plus((ReadableDuration)Duration.standardMinutes((long)2L)));
        IntervalWindow w3 = new IntervalWindow(base.minus((ReadableDuration)Duration.standardMinutes((long)3L)), base.plus((ReadableDuration)Duration.standardMinutes((long)3L)));
        tester.startElement(WindowedValue.of((Object)ElementAndRestriction.of((Object)42, (Object)new SomeRestriction()), (Instant)base, Arrays.asList(w1, w2, w3), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING));
        for (IntervalWindow w : new IntervalWindow[]{w1, w2, w3}) {
            Assert.assertEquals(Arrays.asList(TimestampedValue.of((Object)"42a", (Instant)base), TimestampedValue.of((Object)"42b", (Instant)base), TimestampedValue.of((Object)"42c", (Instant)base)), tester.peekOutputElementsInWindow((BoundedWindow)w));
        }
    }

    @Test
    public void testResumeSetsTimer() throws Exception {
        SelfInitiatedResumeFn fn = new SelfInitiatedResumeFn();
        Instant base = Instant.now();
        ProcessFnTester tester = new ProcessFnTester(base, fn, (Coder<Integer>)BigEndianIntegerCoder.of(), (Coder<SomeRestriction>)SerializableCoder.of(SomeRestriction.class));
        tester.startElement(42, new SomeRestriction());
        Assert.assertThat(tester.takeOutputElements(), (Matcher)Matchers.contains((Object[])new String[]{"42"}));
        Assert.assertFalse((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)3L)));
        Assert.assertTrue((boolean)tester.takeOutputElements().isEmpty());
        Assert.assertTrue((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)3L)));
        Assert.assertThat(tester.takeOutputElements(), (Matcher)Matchers.contains((Object[])new String[]{"42"}));
        Assert.assertFalse((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)3L)));
        Assert.assertTrue((boolean)tester.takeOutputElements().isEmpty());
        Assert.assertTrue((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)3L)));
        Assert.assertThat(tester.takeOutputElements(), (Matcher)Matchers.contains((Object[])new String[]{"42"}));
    }

    @Test
    public void testResumeCarriesOverState() throws Exception {
        CounterFn fn = new CounterFn(3, 1);
        Instant base = Instant.now();
        ProcessFnTester tester = new ProcessFnTester(base, fn, (Coder<Integer>)BigEndianIntegerCoder.of(), (Coder<SomeCheckpoint>)SerializableCoder.of(SomeCheckpoint.class));
        tester.startElement(42, new SomeCheckpoint(0));
        Assert.assertThat(tester.takeOutputElements(), (Matcher)Matchers.contains((Object[])new String[]{"42"}));
        Assert.assertTrue((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)1L)));
        Assert.assertThat(tester.takeOutputElements(), (Matcher)Matchers.contains((Object[])new String[]{"43"}));
        Assert.assertTrue((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)1L)));
        Assert.assertThat(tester.takeOutputElements(), (Matcher)Matchers.contains((Object[])new String[]{"44"}));
        Assert.assertTrue((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)1L)));
        Assert.assertEquals((long)0L, (long)tester.takeOutputElements().size());
        Assert.assertFalse((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)1L)));
    }

    @Test
    public void testReactsToCheckpoint() throws Exception {
        int max = 10000;
        CounterFn fn = new CounterFn(2 * max + max / 2, 2 * max);
        Instant base = Instant.now();
        int baseIndex = 42;
        ProcessFnTester tester = new ProcessFnTester(base, fn, (Coder<Integer>)BigEndianIntegerCoder.of(), (Coder<SomeCheckpoint>)SerializableCoder.of(SomeCheckpoint.class));
        tester.startElement(baseIndex, new SomeCheckpoint(0));
        List<String> elements = tester.takeOutputElements();
        Assert.assertEquals((long)max, (long)elements.size());
        Assert.assertThat(elements, (Matcher)Matchers.hasItem((Object)String.valueOf(baseIndex)));
        Assert.assertThat(elements, (Matcher)Matchers.hasItem((Object)String.valueOf(baseIndex + max - 1)));
        Assert.assertTrue((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)1L)));
        elements = tester.takeOutputElements();
        Assert.assertEquals((long)max, (long)elements.size());
        Assert.assertThat(elements, (Matcher)Matchers.hasItem((Object)String.valueOf(baseIndex + max)));
        Assert.assertThat(elements, (Matcher)Matchers.hasItem((Object)String.valueOf(baseIndex + 2 * max - 1)));
        Assert.assertTrue((boolean)tester.advanceProcessingTimeBy(Duration.standardSeconds((long)1L)));
        elements = tester.takeOutputElements();
        Assert.assertEquals((long)(max / 2), (long)elements.size());
        Assert.assertThat(elements, (Matcher)Matchers.hasItem((Object)String.valueOf(baseIndex + 2 * max)));
        Assert.assertThat(elements, (Matcher)Matchers.hasItem((Object)String.valueOf(baseIndex + 2 * max + max / 2 - 1)));
        Assert.assertThat(elements, (Matcher)Matchers.not((Matcher)Matchers.hasItem((Object)String.valueOf(baseIndex + 2 * max + max / 2))));
    }

    private static class CounterFn
    extends DoFn<Integer, String> {
        private final int numTotalOutputs;
        private final int numOutputsPerCall;

        private CounterFn(int numTotalOutputs, int numOutputsPerCall) {
            this.numTotalOutputs = numTotalOutputs;
            this.numOutputsPerCall = numOutputsPerCall;
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn.ProcessContext c, SomeCheckpointTracker tracker) {
            int start = tracker.currentRestriction().firstUnprocessedIndex;
            for (int i = 0; i < this.numOutputsPerCall; ++i) {
                int index = start + i;
                if (!tracker.tryUpdateCheckpoint(index + 1)) {
                    return DoFn.ProcessContinuation.resume();
                }
                if (index >= this.numTotalOutputs) {
                    return DoFn.ProcessContinuation.stop();
                }
                c.output((Object)String.valueOf((Integer)c.element() + index));
            }
            return DoFn.ProcessContinuation.resume();
        }

        @DoFn.GetInitialRestriction
        public SomeCheckpoint getInitialRestriction(Integer elem) {
            throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
        }

        @DoFn.NewTracker
        public SomeCheckpointTracker newTracker(SomeCheckpoint restriction) {
            return new SomeCheckpointTracker(restriction);
        }
    }

    private static class SomeCheckpointTracker
    implements RestrictionTracker<SomeCheckpoint> {
        private SomeCheckpoint current;
        private boolean isActive = true;

        private SomeCheckpointTracker(SomeCheckpoint current) {
            this.current = current;
        }

        public SomeCheckpoint currentRestriction() {
            return this.current;
        }

        public boolean tryUpdateCheckpoint(int firstUnprocessedIndex) {
            if (!this.isActive) {
                return false;
            }
            this.current = new SomeCheckpoint(firstUnprocessedIndex);
            return true;
        }

        public SomeCheckpoint checkpoint() {
            this.isActive = false;
            return this.current;
        }
    }

    private static class SomeCheckpoint
    implements Serializable {
        private int firstUnprocessedIndex;

        private SomeCheckpoint(int firstUnprocessedIndex) {
            this.firstUnprocessedIndex = firstUnprocessedIndex;
        }
    }

    private static class SelfInitiatedResumeFn
    extends DoFn<Integer, String> {
        private SelfInitiatedResumeFn() {
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn.ProcessContext c, SomeRestrictionTracker tracker) {
            c.output((Object)((Integer)c.element()).toString());
            return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds((long)5L)).withWatermark(c.timestamp());
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(Integer elem) {
            return new SomeRestriction();
        }

        @DoFn.NewTracker
        public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
            return new SomeRestrictionTracker();
        }
    }

    private static class ToStringFn
    extends DoFn<Integer, String> {
        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, SomeRestrictionTracker tracker) {
            c.output((Object)(((Integer)c.element()).toString() + "a"));
            c.output((Object)(((Integer)c.element()).toString() + "b"));
            c.output((Object)(((Integer)c.element()).toString() + "c"));
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(Integer elem) {
            return new SomeRestriction();
        }

        @DoFn.NewTracker
        public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
            return new SomeRestrictionTracker();
        }
    }

    private static class ProcessFnTester<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> {
        private final DoFnTester<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> tester;
        private Instant currentProcessingTime;

        ProcessFnTester(Instant currentProcessingTime, DoFn<InputT, OutputT> fn, Coder<InputT> inputCoder, Coder<RestrictionT> restrictionCoder) throws Exception {
            SplittableParDo.ProcessFn processFn = new SplittableParDo.ProcessFn(fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
            this.tester = DoFnTester.of((OldDoFn)processFn);
            this.tester.startBundle();
            this.tester.advanceProcessingTime(currentProcessingTime);
            this.currentProcessingTime = currentProcessingTime;
        }

        void startElement(InputT element, RestrictionT restriction) throws Exception {
            this.startElement(WindowedValue.of((Object)ElementAndRestriction.of(element, restriction), (Instant)this.currentProcessingTime, (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING));
        }

        void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue) throws Exception {
            this.tester.processElement((Object)KeyedWorkItems.elementsWorkItem((Object)"key", Arrays.asList(windowedValue)));
        }

        boolean advanceProcessingTimeBy(Duration duration) throws Exception {
            this.currentProcessingTime = this.currentProcessingTime.plus((ReadableDuration)duration);
            List timers = this.tester.advanceProcessingTime(this.currentProcessingTime);
            if (timers.isEmpty()) {
                return false;
            }
            this.tester.processElement((Object)KeyedWorkItems.timersWorkItem((Object)"key", (Iterable)timers));
            return true;
        }

        List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow window) {
            return this.tester.peekOutputElementsInWindow(window);
        }

        List<OutputT> takeOutputElements() {
            return this.tester.takeOutputElements();
        }
    }

    private static class UnboundedFakeFn
    extends DoFn<Integer, String> {
        private UnboundedFakeFn() {
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation processElement(DoFn.ProcessContext context, SomeRestrictionTracker tracker) {
            return DoFn.ProcessContinuation.stop();
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(Integer element) {
            return null;
        }

        @DoFn.NewTracker
        public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
            return null;
        }
    }

    private static class BoundedFakeFn
    extends DoFn<Integer, String> {
        private BoundedFakeFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context, SomeRestrictionTracker tracker) {
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(Integer element) {
            return null;
        }

        @DoFn.NewTracker
        public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
            return null;
        }
    }

    private static class SomeRestrictionTracker
    implements RestrictionTracker<SomeRestriction> {
        private final SomeRestriction someRestriction = new SomeRestriction();

        private SomeRestrictionTracker() {
        }

        public SomeRestriction currentRestriction() {
            return this.someRestriction;
        }

        public SomeRestriction checkpoint() {
            return this.someRestriction;
        }
    }

    private static class SomeRestriction
    implements Serializable {
        private SomeRestriction() {
        }
    }
}

