/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.repackaged.com.google.common.base.Function;
import org.apache.beam.sdk.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.FluentIterable;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.ExecutableTrigger;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.PaneInfoTracker;
import org.apache.beam.sdk.util.ReduceFn;
import org.apache.beam.sdk.util.ReduceFnRunner;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TriggerRunner;
import org.apache.beam.sdk.util.WatermarkHold;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Assert;

public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    private static final String KEY = "TEST_KEY";
    private final TestInMemoryStateInternals<String> stateInternals = new TestInMemoryStateInternals<String>("TEST_KEY");
    private final TestTimerInternals timerInternals = new TestTimerInternals();
    private final WindowFn<Object, W> windowFn;
    private final TestWindowingInternals windowingInternals;
    private final Coder<OutputT> outputCoder;
    private final WindowingStrategy<Object, W> objectStrategy;
    private final ReduceFn<String, InputT, OutputT, W> reduceFn;
    private final PipelineOptions options;
    private boolean autoAdvanceOutputWatermark;
    private ExecutableTrigger executableTrigger;
    private final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator("DroppedDueToClosedWindow");

    public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
        return new ReduceFnTester(windowingStrategy, SystemReduceFn.buffering((Coder)VarIntCoder.of()), IterableCoder.of((Coder)VarIntCoder.of()), PipelineOptionsFactory.create(), (SideInputReader)NullSideInputReader.empty());
    }

    public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(WindowFn<?, W> windowFn, Trigger trigger, WindowingStrategy.AccumulationMode mode, Duration allowedDataLateness, Window.ClosingBehavior closingBehavior) throws Exception {
        WindowingStrategy strategy = WindowingStrategy.of(windowFn).withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()).withTrigger(trigger).withMode(mode).withAllowedLateness(allowedDataLateness).withClosingBehavior(closingBehavior);
        return ReduceFnTester.nonCombining(strategy);
    }

    public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining(WindowingStrategy<?, W> strategy, Combine.KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder) throws Exception {
        CoderRegistry registry = new CoderRegistry();
        registry.registerStandardCoders();
        AppliedCombineFn fn = AppliedCombineFn.withInputCoder(combineFn, (CoderRegistry)registry, (KvCoder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of()));
        return new ReduceFnTester(strategy, SystemReduceFn.combining((Coder)StringUtf8Coder.of(), (AppliedCombineFn)fn), outputCoder, PipelineOptionsFactory.create(), (SideInputReader)NullSideInputReader.empty());
    }

    public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining(WindowingStrategy<?, W> strategy, CombineWithContext.KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder, PipelineOptions options, SideInputReader sideInputReader) throws Exception {
        CoderRegistry registry = new CoderRegistry();
        registry.registerStandardCoders();
        AppliedCombineFn fn = AppliedCombineFn.withInputCoder(combineFn, (CoderRegistry)registry, (KvCoder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of()));
        return new ReduceFnTester(strategy, SystemReduceFn.combining((Coder)StringUtf8Coder.of(), (AppliedCombineFn)fn), outputCoder, options, sideInputReader);
    }

    public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining(WindowFn<?, W> windowFn, Trigger trigger, WindowingStrategy.AccumulationMode mode, Combine.KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder, Duration allowedDataLateness) throws Exception {
        WindowingStrategy strategy = WindowingStrategy.of(windowFn).withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()).withTrigger(trigger).withMode(mode).withAllowedLateness(allowedDataLateness);
        return ReduceFnTester.combining(strategy, combineFn, outputCoder);
    }

    private ReduceFnTester(WindowingStrategy<?, W> wildcardStrategy, ReduceFn<String, InputT, OutputT, W> reduceFn, Coder<OutputT> outputCoder, PipelineOptions options, SideInputReader sideInputReader) throws Exception {
        WindowingStrategy<?, W> objectStrategy = wildcardStrategy;
        this.objectStrategy = objectStrategy;
        this.reduceFn = reduceFn;
        this.windowFn = objectStrategy.getWindowFn();
        this.windowingInternals = new TestWindowingInternals(sideInputReader);
        this.outputCoder = outputCoder;
        this.autoAdvanceOutputWatermark = true;
        this.executableTrigger = wildcardStrategy.getTrigger();
        this.options = options;
    }

    public void setAutoAdvanceOutputWatermark(boolean autoAdvanceOutputWatermark) {
        this.autoAdvanceOutputWatermark = autoAdvanceOutputWatermark;
    }

    @Nullable
    public Instant getNextTimer(TimeDomain domain) {
        return this.timerInternals.getNextTimer(domain);
    }

    ReduceFnRunner<String, InputT, OutputT, W> createRunner() {
        return new ReduceFnRunner((Object)KEY, this.objectStrategy, this.stateInternals, (TimerInternals)this.timerInternals, (WindowingInternals)this.windowingInternals, (Aggregator)this.droppedDueToClosedWindow, this.reduceFn, this.options);
    }

    public ExecutableTrigger getTrigger() {
        return this.executableTrigger;
    }

    public boolean isMarkedFinished(W window) {
        return this.createRunner().isFinished(window);
    }

    public boolean hasNoActiveWindows() {
        return this.createRunner().hasNoActiveWindows();
    }

    @SafeVarargs
    public final void assertHasOnlyGlobalAndFinishedSetsFor(W ... expectedWindows) {
        this.assertHasOnlyGlobalAndAllowedTags((Set<W>)ImmutableSet.copyOf((Object[])expectedWindows), (Set<StateTag<? super String, ?>>)ImmutableSet.of((Object)TriggerRunner.FINISHED_BITS_TAG));
    }

    @SafeVarargs
    public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W ... expectedWindows) {
        this.assertHasOnlyGlobalAndAllowedTags((Set<W>)ImmutableSet.copyOf((Object[])expectedWindows), (Set<StateTag<? super String, ?>>)ImmutableSet.of((Object)TriggerRunner.FINISHED_BITS_TAG, (Object)PaneInfoTracker.PANE_INFO_TAG, (Object)WatermarkHold.watermarkHoldTagForOutputTimeFn((OutputTimeFn)this.objectStrategy.getOutputTimeFn()), (Object)WatermarkHold.EXTRA_HOLD_TAG));
    }

    public final void assertHasOnlyGlobalState() {
        this.assertHasOnlyGlobalAndAllowedTags(Collections.emptySet(), Collections.emptySet());
    }

    @SafeVarargs
    public final void assertHasOnlyGlobalAndPaneInfoFor(W ... expectedWindows) {
        this.assertHasOnlyGlobalAndAllowedTags((Set<W>)ImmutableSet.copyOf((Object[])expectedWindows), (Set<StateTag<? super String, ?>>)ImmutableSet.of((Object)PaneInfoTracker.PANE_INFO_TAG, (Object)WatermarkHold.watermarkHoldTagForOutputTimeFn((OutputTimeFn)this.objectStrategy.getOutputTimeFn()), (Object)WatermarkHold.EXTRA_HOLD_TAG));
    }

    private void assertHasOnlyGlobalAndAllowedTags(Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) {
        HashSet<StateNamespace> expectedWindowsSet = new HashSet<StateNamespace>();
        for (BoundedWindow expectedWindow : expectedWindows) {
            expectedWindowsSet.add(this.windowNamespace(expectedWindow));
        }
        HashMap actualWindows = new HashMap();
        for (StateNamespace namespace : this.stateInternals.getNamespacesInUse()) {
            Set<StateTag<String, ?>> tagsInUse;
            if (namespace instanceof StateNamespaces.GlobalNamespace) continue;
            if (namespace instanceof StateNamespaces.WindowNamespace) {
                tagsInUse = this.stateInternals.getTagsInUse(namespace);
                if (tagsInUse.isEmpty()) continue;
                actualWindows.put(namespace, tagsInUse);
                Sets.SetView unexpected = Sets.difference(tagsInUse, allowedTags);
                if (unexpected.isEmpty()) continue;
                Assert.fail((String)(namespace + " has unexpected states: " + tagsInUse));
                continue;
            }
            if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
                tagsInUse = this.stateInternals.getTagsInUse(namespace);
                Assert.assertTrue((String)(namespace + " contains " + tagsInUse), (boolean)tagsInUse.isEmpty());
                continue;
            }
            Assert.fail((String)("Unrecognized namespace " + namespace));
        }
        Assert.assertEquals((String)("Still in use: " + ((Object)actualWindows).toString()), expectedWindowsSet, actualWindows.keySet());
    }

    private StateNamespace windowNamespace(W window) {
        return StateNamespaces.window((Coder)this.windowFn.windowCoder(), window);
    }

    public Instant getWatermarkHold() {
        return this.stateInternals.earliestWatermarkHold();
    }

    public Instant getOutputWatermark() {
        return this.timerInternals.currentOutputWatermarkTime();
    }

    public long getElementsDroppedDueToClosedWindow() {
        return this.droppedDueToClosedWindow.getSum();
    }

    public int getOutputSize() {
        return this.windowingInternals.outputs.size();
    }

    public List<WindowedValue<OutputT>> extractOutput() {
        ImmutableList result = FluentIterable.from((Iterable)this.windowingInternals.outputs).transform(new Function<WindowedValue<KV<String, OutputT>>, WindowedValue<OutputT>>(){

            public WindowedValue<OutputT> apply(WindowedValue<KV<String, OutputT>> input) {
                return input.withValue(((KV)input.getValue()).getValue());
            }
        }).toList();
        this.windowingInternals.outputs.clear();
        return result;
    }

    public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
        ReduceFnRunner<String, InputT, OutputT, W> runner = this.createRunner();
        this.timerInternals.advanceInputWatermark(runner, newInputWatermark);
        runner.persist();
    }

    public void advanceOutputWatermark(Instant newOutputWatermark) throws Exception {
        this.timerInternals.advanceOutputWatermark(newOutputWatermark);
    }

    public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
        ReduceFnRunner<String, InputT, OutputT, W> runner = this.createRunner();
        this.timerInternals.advanceProcessingTime(runner, newProcessingTime);
        runner.persist();
    }

    public void advanceSynchronizedProcessingTime(Instant newProcessingTime) throws Exception {
        ReduceFnRunner<String, InputT, OutputT, W> runner = this.createRunner();
        this.timerInternals.advanceSynchronizedProcessingTime(runner, newProcessingTime);
        runner.persist();
    }

    @SafeVarargs
    public final void injectElements(TimestampedValue<InputT> ... values) throws Exception {
        for (TimestampedValue<InputT> value : values) {
            WindowTracing.trace((String)"TriggerTester.injectElements: {}", (Object[])new Object[]{value});
        }
        ReduceFnRunner<String, InputT, OutputT, W> runner = this.createRunner();
        runner.processElements(Iterables.transform(Arrays.asList(values), (Function)new Function<TimestampedValue<InputT>, WindowedValue<InputT>>(){

            public WindowedValue<InputT> apply(TimestampedValue<InputT> input) {
                try {
                    Object value = input.getValue();
                    Instant timestamp = input.getTimestamp();
                    Collection windows = ReduceFnTester.this.windowFn.assignWindows(new TestAssignContext(ReduceFnTester.this.windowFn, value, timestamp, (BoundedWindow)GlobalWindow.INSTANCE));
                    return WindowedValue.of((Object)value, (Instant)timestamp, (Collection)windows, (PaneInfo)PaneInfo.NO_FIRING);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }));
        runner.persist();
    }

    public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
        ReduceFnRunner<String, InputT, OutputT, W> runner = this.createRunner();
        runner.onTimer(TimerInternals.TimerData.of((StateNamespace)StateNamespaces.window((Coder)this.windowFn.windowCoder(), window), (Instant)timestamp, (TimeDomain)domain));
        runner.persist();
    }

    private class TestTimerInternals
    implements TimerInternals {
        private Set<TimerInternals.TimerData> existingTimers = new HashSet<TimerInternals.TimerData>();
        private PriorityQueue<TimerInternals.TimerData> watermarkTimers = new PriorityQueue(11);
        private PriorityQueue<TimerInternals.TimerData> processingTimers = new PriorityQueue(11);
        @Nullable
        private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
        @Nullable
        private Instant outputWatermarkTime = null;
        private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
        @Nullable
        private Instant synchronizedProcessingTime = null;

        private TestTimerInternals() {
        }

        @Nullable
        public Instant getNextTimer(TimeDomain domain) {
            TimerInternals.TimerData data = null;
            switch (domain) {
                case EVENT_TIME: {
                    data = this.watermarkTimers.peek();
                    break;
                }
                case PROCESSING_TIME: 
                case SYNCHRONIZED_PROCESSING_TIME: {
                    data = this.processingTimers.peek();
                }
            }
            Preconditions.checkNotNull(data);
            return data == null ? null : data.getTimestamp();
        }

        private PriorityQueue<TimerInternals.TimerData> queue(TimeDomain domain) {
            switch (domain) {
                case EVENT_TIME: {
                    return this.watermarkTimers;
                }
                case PROCESSING_TIME: 
                case SYNCHRONIZED_PROCESSING_TIME: {
                    return this.processingTimers;
                }
            }
            throw new RuntimeException();
        }

        public void setTimer(TimerInternals.TimerData timer) {
            WindowTracing.trace((String)"TestTimerInternals.setTimer: {}", (Object[])new Object[]{timer});
            if (this.existingTimers.add(timer)) {
                this.queue(timer.getDomain()).add(timer);
            }
        }

        public void deleteTimer(TimerInternals.TimerData timer) {
            WindowTracing.trace((String)"TestTimerInternals.deleteTimer: {}", (Object[])new Object[]{timer});
            this.existingTimers.remove(timer);
            this.queue(timer.getDomain()).remove(timer);
        }

        public Instant currentProcessingTime() {
            return this.processingTime;
        }

        @Nullable
        public Instant currentSynchronizedProcessingTime() {
            return this.synchronizedProcessingTime;
        }

        public Instant currentInputWatermarkTime() {
            return (Instant)Preconditions.checkNotNull((Object)this.inputWatermarkTime);
        }

        @Nullable
        public Instant currentOutputWatermarkTime() {
            return this.outputWatermarkTime;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("watermarkTimers", this.watermarkTimers).add("processingTimers", this.processingTimers).add("inputWatermarkTime", (Object)this.inputWatermarkTime).add("outputWatermarkTime", (Object)this.outputWatermarkTime).add("processingTime", (Object)this.processingTime).toString();
        }

        public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception {
            Preconditions.checkNotNull((Object)newInputWatermark);
            Preconditions.checkState((!newInputWatermark.isBefore((ReadableInstant)this.inputWatermarkTime) ? 1 : 0) != 0, (String)"Cannot move input watermark time backwards from %s to %s", (Object[])new Object[]{this.inputWatermarkTime, newInputWatermark});
            WindowTracing.trace((String)"TestTimerInternals.advanceInputWatermark: from {} to {}", (Object[])new Object[]{this.inputWatermarkTime, newInputWatermark});
            this.inputWatermarkTime = newInputWatermark;
            this.advanceAndFire(runner, newInputWatermark, TimeDomain.EVENT_TIME);
            Instant hold = ReduceFnTester.this.stateInternals.earliestWatermarkHold();
            if (hold == null) {
                WindowTracing.trace((String)"TestTimerInternals.advanceInputWatermark: no holds, so output watermark = input watermark", (Object[])new Object[0]);
                hold = this.inputWatermarkTime;
            }
            if (ReduceFnTester.this.autoAdvanceOutputWatermark) {
                this.advanceOutputWatermark(hold);
            }
        }

        public void advanceOutputWatermark(Instant newOutputWatermark) {
            Preconditions.checkNotNull((Object)newOutputWatermark);
            if (newOutputWatermark.isAfter((ReadableInstant)this.inputWatermarkTime)) {
                WindowTracing.trace((String)"TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", (Object[])new Object[]{newOutputWatermark, this.inputWatermarkTime});
                newOutputWatermark = this.inputWatermarkTime;
            }
            Preconditions.checkState((this.outputWatermarkTime == null || !newOutputWatermark.isBefore((ReadableInstant)this.outputWatermarkTime) ? 1 : 0) != 0, (String)"Cannot move output watermark time backwards from %s to %s", (Object[])new Object[]{this.outputWatermarkTime, newOutputWatermark});
            WindowTracing.trace((String)"TestTimerInternals.advanceOutputWatermark: from {} to {}", (Object[])new Object[]{this.outputWatermarkTime, newOutputWatermark});
            this.outputWatermarkTime = newOutputWatermark;
        }

        public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) throws Exception {
            Preconditions.checkState((!newProcessingTime.isBefore((ReadableInstant)this.processingTime) ? 1 : 0) != 0, (String)"Cannot move processing time backwards from %s to %s", (Object[])new Object[]{this.processingTime, newProcessingTime});
            WindowTracing.trace((String)"TestTimerInternals.advanceProcessingTime: from {} to {}", (Object[])new Object[]{this.processingTime, newProcessingTime});
            this.processingTime = newProcessingTime;
            this.advanceAndFire(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
        }

        public void advanceSynchronizedProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newSynchronizedProcessingTime) throws Exception {
            Preconditions.checkState((!newSynchronizedProcessingTime.isBefore((ReadableInstant)this.synchronizedProcessingTime) ? 1 : 0) != 0, (String)"Cannot move processing time backwards from %s to %s", (Object[])new Object[]{this.processingTime, newSynchronizedProcessingTime});
            WindowTracing.trace((String)"TestTimerInternals.advanceProcessingTime: from {} to {}", (Object[])new Object[]{this.synchronizedProcessingTime, newSynchronizedProcessingTime});
            this.synchronizedProcessingTime = newSynchronizedProcessingTime;
            this.advanceAndFire(runner, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
        }

        private void advanceAndFire(ReduceFnRunner<?, ?, ?, ?> runner, Instant currentTime, TimeDomain domain) throws Exception {
            PriorityQueue<TimerInternals.TimerData> queue = this.queue(domain);
            boolean shouldFire = false;
            do {
                TimerInternals.TimerData timer;
                boolean bl = shouldFire = (timer = queue.peek()) != null && currentTime.isAfter((ReadableInstant)timer.getTimestamp());
                if (!shouldFire) continue;
                WindowTracing.trace((String)"TestTimerInternals.advanceAndFire: firing {} at {}", (Object[])new Object[]{timer, currentTime});
                queue.remove();
                runner.onTimer(timer);
            } while (shouldFire);
        }
    }

    private static class InMemoryLongSumAggregator
    implements Aggregator<Long, Long> {
        private final String name;
        private long sum = 0L;

        public InMemoryLongSumAggregator(String name) {
            this.name = name;
        }

        public void addValue(Long value) {
            this.sum += value.longValue();
        }

        public String getName() {
            return this.name;
        }

        public Combine.CombineFn<Long, ?, Long> getCombineFn() {
            return new Sum.SumLongFn();
        }

        public long getSum() {
            return this.sum;
        }
    }

    private static class TestAssignContext<W extends BoundedWindow>
    extends WindowFn.AssignContext {
        private Object element;
        private Instant timestamp;
        private BoundedWindow window;

        public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
            super(windowFn);
            this.element = element;
            this.timestamp = timestamp;
            this.window = window;
        }

        public Object element() {
            return this.element;
        }

        public Instant timestamp() {
            return this.timestamp;
        }

        public BoundedWindow window() {
            return this.window;
        }
    }

    private class TestWindowingInternals
    implements WindowingInternals<InputT, KV<String, OutputT>> {
        private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList();
        private SideInputReader sideInputReader;

        private TestWindowingInternals(SideInputReader sideInputReader) {
            this.sideInputReader = sideInputReader;
        }

        public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            KV copy = (KV)SerializableUtils.ensureSerializableByCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)ReduceFnTester.this.outputCoder), output, (String)"outputForWindow");
            WindowedValue value = WindowedValue.of((Object)copy, (Instant)timestamp, windows, (PaneInfo)pane);
            this.outputs.add(value);
        }

        public TimerInternals timerInternals() {
            throw new UnsupportedOperationException("Testing triggers should not use timers from WindowingInternals.");
        }

        public Collection<? extends BoundedWindow> windows() {
            throw new UnsupportedOperationException("Testing triggers should not use windows from WindowingInternals.");
        }

        public PaneInfo pane() {
            throw new UnsupportedOperationException("Testing triggers should not use pane from WindowingInternals.");
        }

        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
            throw new UnsupportedOperationException("Testing triggers should not use writePCollectionViewData from WindowingInternals.");
        }

        public StateInternals<Object> stateInternals() {
            TestInMemoryStateInternals untypedStateInternals = ReduceFnTester.this.stateInternals;
            return untypedStateInternals;
        }

        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
            if (!this.sideInputReader.contains(view)) {
                throw new IllegalArgumentException("calling sideInput() with unknown view");
            }
            BoundedWindow sideInputWindow = view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
            return (T)this.sideInputReader.get(view, sideInputWindow);
        }
    }

    private static class TestInMemoryStateInternals<K>
    extends InMemoryStateInternals<K> {
        public TestInMemoryStateInternals(K key) {
            super(key);
        }

        public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
            HashSet inUse = new HashSet();
            for (Map.Entry entry : this.inMemoryState.getTagsInUse(namespace).entrySet()) {
                if (this.isEmptyForTesting((State)entry.getValue())) continue;
                inUse.add((StateTag<K, ?>)entry.getKey());
            }
            return inUse;
        }

        public Set<StateNamespace> getNamespacesInUse() {
            return this.inMemoryState.getNamespacesInUse();
        }

        public Instant earliestWatermarkHold() {
            Instant minimum = null;
            for (State storage : this.inMemoryState.values()) {
                if (!(storage instanceof WatermarkHoldState)) continue;
                Instant hold = (Instant)((WatermarkHoldState)storage).read();
                if (minimum != null && (hold == null || !hold.isBefore((ReadableInstant)minimum))) continue;
                minimum = hold;
            }
            return minimum;
        }
    }
}

