/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core.triggers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.FinishedTriggers;
import org.apache.beam.runners.core.triggers.FinishedTriggersSet;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ActiveWindowSet;
import org.apache.beam.sdk.util.MergingActiveWindowSet;
import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.Timers;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
import org.apache.beam.sdk.util.state.TimerCallback;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;

public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
    private final TestInMemoryStateInternals<?> stateInternals = new TestInMemoryStateInternals(null);
    private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
    private final TriggerStateMachineContextFactory<W> contextFactory;
    protected final WindowFn<Object, W> windowFn;
    private final ActiveWindowSet<W> activeWindows;
    private final Map<W, W> windowToMergeResult;
    protected final ExecutableTriggerStateMachine executableTrigger;
    private final Map<W, FinishedTriggers> finishedSets;

    public static <W extends BoundedWindow> SimpleTriggerStateMachineTester<W> forTrigger(TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn) throws Exception {
        ExecutableTriggerStateMachine executableTriggerStateMachine = ExecutableTriggerStateMachine.create((TriggerStateMachine)stateMachine);
        WindowingStrategy.AccumulationMode mode = windowFn.isNonMerging() ? WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES : WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
        return new SimpleTriggerStateMachineTester(executableTriggerStateMachine, windowFn, Duration.ZERO);
    }

    public static <InputT, W extends BoundedWindow> TriggerStateMachineTester<InputT, W> forAdvancedTrigger(TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn) throws Exception {
        ExecutableTriggerStateMachine executableTriggerStateMachine = ExecutableTriggerStateMachine.create((TriggerStateMachine)stateMachine);
        WindowingStrategy.AccumulationMode mode = windowFn.isNonMerging() ? WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES : WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
        return new TriggerStateMachineTester<InputT, W>(executableTriggerStateMachine, windowFn, Duration.ZERO);
    }

    protected TriggerStateMachineTester(ExecutableTriggerStateMachine executableTriggerStateMachine, WindowFn<Object, W> windowFn, Duration allowedLateness) throws Exception {
        this.windowFn = windowFn;
        this.executableTrigger = executableTriggerStateMachine;
        this.finishedSets = new HashMap<W, FinishedTriggers>();
        this.activeWindows = windowFn.isNonMerging() ? new NonMergingActiveWindowSet() : new MergingActiveWindowSet(windowFn, this.stateInternals);
        this.windowToMergeResult = new HashMap<W, W>();
        this.contextFactory = new TriggerStateMachineContextFactory(windowFn, this.stateInternals, this.activeWindows);
    }

    public void clearState(W window) throws Exception {
        this.executableTrigger.invokeClear(this.contextFactory.base(window, (Timers)new TestTimers(this.windowNamespace(window)), this.executableTrigger, this.getFinishedSet(window)));
    }

    public void assertCleared(W window) {
        for (StateNamespace untypedNamespace : this.stateInternals.getNamespacesInUse()) {
            StateNamespaces.WindowAndTriggerNamespace namespace;
            if (!(untypedNamespace instanceof StateNamespaces.WindowAndTriggerNamespace) || !(namespace = (StateNamespaces.WindowAndTriggerNamespace)untypedNamespace).getWindow().equals(window)) continue;
            Set tagsInUse = this.stateInternals.getTagsInUse((StateNamespace)namespace);
            Assert.assertTrue((String)("Trigger has not cleared tags: " + tagsInUse), (boolean)tagsInUse.isEmpty());
        }
    }

    public boolean isMarkedFinished(W window) {
        FinishedTriggers finishedSet = this.finishedSets.get(window);
        if (finishedSet == null) {
            return false;
        }
        return finishedSet.isFinished(this.executableTrigger);
    }

    private StateNamespace windowNamespace(W window) {
        return StateNamespaces.window((Coder)this.windowFn.windowCoder(), (BoundedWindow)((BoundedWindow)Preconditions.checkNotNull(window)));
    }

    public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
        this.timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark);
    }

    public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
        this.timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime);
    }

    @SafeVarargs
    public final void injectElements(TimestampedValue<InputT> ... values) throws Exception {
        this.injectElements(Arrays.asList(values));
    }

    public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception {
        for (TimestampedValue<InputT> value : values) {
            WindowTracing.trace((String)"TriggerTester.injectElements: {}", (Object[])new Object[]{value});
        }
        ArrayList windowedValues = Lists.newArrayListWithCapacity((int)values.size());
        for (TimestampedValue<InputT> input : values) {
            try {
                Object value = input.getValue();
                Instant timestamp = input.getTimestamp();
                Collection assignedWindows = this.windowFn.assignWindows(new TestAssignContext<W>(this.windowFn, value, timestamp, (BoundedWindow)GlobalWindow.INSTANCE));
                for (BoundedWindow window : assignedWindows) {
                    this.activeWindows.addActiveForTesting(window);
                    this.timerInternals.setTimer(TimerInternals.TimerData.of((StateNamespace)this.windowNamespace(window), (Instant)window.maxTimestamp(), (TimeDomain)TimeDomain.EVENT_TIME));
                }
                windowedValues.add(WindowedValue.of((Object)value, (Instant)timestamp, (Collection)assignedWindows, (PaneInfo)PaneInfo.NO_FIRING));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        for (WindowedValue windowedValue : windowedValues) {
            for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
                BoundedWindow window = this.mergeResult(untypedWindow);
                TriggerStateMachine.OnElementContext context = this.contextFactory.createOnElementContext(window, (Timers)new TestTimers(this.windowNamespace(window)), windowedValue.getTimestamp(), this.executableTrigger, this.getFinishedSet(window));
                if (context.trigger().isFinished()) continue;
                this.executableTrigger.invokeOnElement(context);
            }
        }
    }

    public boolean shouldFire(W window) throws Exception {
        TriggerStateMachine.TriggerContext context = this.contextFactory.base(window, (Timers)new TestTimers(this.windowNamespace(window)), this.executableTrigger, this.getFinishedSet(window));
        this.executableTrigger.getSpec().prefetchShouldFire(context.state());
        return this.executableTrigger.invokeShouldFire(context);
    }

    public void fireIfShouldFire(W window) throws Exception {
        TriggerStateMachine.TriggerContext context = this.contextFactory.base(window, (Timers)new TestTimers(this.windowNamespace(window)), this.executableTrigger, this.getFinishedSet(window));
        this.executableTrigger.getSpec().prefetchShouldFire(context.state());
        if (this.executableTrigger.invokeShouldFire(context)) {
            this.executableTrigger.getSpec().prefetchOnFire(context.state());
            this.executableTrigger.invokeOnFire(context);
            if (context.trigger().isFinished()) {
                this.activeWindows.remove(window);
                this.executableTrigger.invokeClear(context);
            }
        }
    }

    public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) {
        this.getFinishedSet(window).setFinished((ExecutableTriggerStateMachine)this.executableTrigger.subTriggers().get(subTriggerIndex), value);
    }

    public final void mergeWindows() throws Exception {
        this.windowToMergeResult.clear();
        this.activeWindows.merge(new ActiveWindowSet.MergeCallback<W>(){

            public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
            }

            public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
                ArrayList<BoundedWindow> activeToBeMerged = new ArrayList<BoundedWindow>();
                for (BoundedWindow window : toBeMerged) {
                    TriggerStateMachineTester.this.windowToMergeResult.put(window, mergeResult);
                    if (!TriggerStateMachineTester.this.activeWindows.isActive(window)) continue;
                    activeToBeMerged.add(window);
                }
                HashMap mergingFinishedSets = Maps.newHashMapWithExpectedSize((int)activeToBeMerged.size());
                for (BoundedWindow oldWindow : activeToBeMerged) {
                    mergingFinishedSets.put(oldWindow, TriggerStateMachineTester.this.getFinishedSet(oldWindow));
                }
                TriggerStateMachineTester.this.executableTrigger.invokeOnMerge(TriggerStateMachineTester.this.contextFactory.createOnMergeContext(mergeResult, (Timers)new TestTimers(TriggerStateMachineTester.this.windowNamespace(mergeResult)), TriggerStateMachineTester.this.executableTrigger, TriggerStateMachineTester.this.getFinishedSet(mergeResult), (Map)mergingFinishedSets));
                TriggerStateMachineTester.this.timerInternals.setTimer(TimerInternals.TimerData.of((StateNamespace)TriggerStateMachineTester.this.windowNamespace(mergeResult), (Instant)mergeResult.maxTimestamp(), (TimeDomain)TimeDomain.EVENT_TIME));
            }
        });
    }

    public W mergeResult(W window) {
        BoundedWindow result = (BoundedWindow)this.windowToMergeResult.get(window);
        return (W)(result == null ? window : result);
    }

    private FinishedTriggers getFinishedSet(W window) {
        FinishedTriggers finishedSet = this.finishedSets.get(window);
        if (finishedSet == null) {
            finishedSet = FinishedTriggersSet.fromSet(new HashSet());
            this.finishedSets.put(window, finishedSet);
        }
        return finishedSet;
    }

    private class TestTimers
    implements Timers {
        private final StateNamespace namespace;

        public TestTimers(StateNamespace namespace) {
            Preconditions.checkArgument((boolean)(namespace instanceof StateNamespaces.WindowNamespace));
            this.namespace = namespace;
        }

        public void setTimer(Instant timestamp, TimeDomain timeDomain) {
            TriggerStateMachineTester.this.timerInternals.setTimer(TimerInternals.TimerData.of((StateNamespace)this.namespace, (Instant)timestamp, (TimeDomain)timeDomain));
        }

        public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
            TriggerStateMachineTester.this.timerInternals.deleteTimer(TimerInternals.TimerData.of((StateNamespace)this.namespace, (Instant)timestamp, (TimeDomain)timeDomain));
        }

        public Instant currentProcessingTime() {
            return TriggerStateMachineTester.this.timerInternals.currentProcessingTime();
        }

        @Nullable
        public Instant currentSynchronizedProcessingTime() {
            return TriggerStateMachineTester.this.timerInternals.currentSynchronizedProcessingTime();
        }

        public Instant currentEventTime() {
            return TriggerStateMachineTester.this.timerInternals.currentInputWatermarkTime();
        }
    }

    private static class TestAssignContext<W extends BoundedWindow>
    extends WindowFn.AssignContext {
        private Object element;
        private Instant timestamp;
        private BoundedWindow window;

        public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
            super(windowFn);
            this.element = element;
            this.timestamp = timestamp;
            this.window = window;
        }

        public Object element() {
            return this.element;
        }

        public Instant timestamp() {
            return this.timestamp;
        }

        public BoundedWindow window() {
            return this.window;
        }
    }

    public static class SimpleTriggerStateMachineTester<W extends BoundedWindow>
    extends TriggerStateMachineTester<Integer, W> {
        private SimpleTriggerStateMachineTester(ExecutableTriggerStateMachine executableTriggerStateMachine, WindowFn<Object, W> windowFn, Duration allowedLateness) throws Exception {
            super(executableTriggerStateMachine, windowFn, allowedLateness);
        }

        public void injectElements(int ... values) throws Exception {
            ArrayList timestampedValues = Lists.newArrayListWithCapacity((int)values.length);
            for (int value : values) {
                timestampedValues.add(TimestampedValue.of((Object)value, (Instant)new Instant((long)value)));
            }
            this.injectElements(timestampedValues);
        }

        public SimpleTriggerStateMachineTester<W> withAllowedLateness(Duration allowedLateness) throws Exception {
            return new SimpleTriggerStateMachineTester<W>(this.executableTrigger, this.windowFn, allowedLateness);
        }
    }
}

