/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.jet.processors;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFn;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class WindowGroupP<K, V>
extends AbstractProcessor {
    private static final int PROCESSING_TIME_MIN_INCREMENT = 100;
    private static final Object COMPLETE_MARKER = new Object();
    private static final Object TRY_PROCESS_MARKER = new Object();
    private final SerializablePipelineOptions pipelineOptions;
    private final Coder<V> inputValueValueCoder;
    private final Coder outputCoder;
    private final WindowingStrategy<V, BoundedWindow> windowingStrategy;
    private final Map<Utils.ByteArrayKey, KeyManager> keyManagers = new HashMap<Utils.ByteArrayKey, KeyManager>();
    private final AppendableTraverser<Object> appendableTraverser = new AppendableTraverser(128);
    private final AbstractProcessor.FlatMapper<Object, Object> flatMapper;
    private final String ownerId;
    private Instant latestWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private long lastProcessingTime = System.currentTimeMillis();

    private WindowGroupP(SerializablePipelineOptions pipelineOptions, WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder, Coder outputCoder, WindowingStrategy<V, BoundedWindow> windowingStrategy, String ownerId) {
        this.pipelineOptions = pipelineOptions;
        KvCoder inputValueCoder = (KvCoder)inputCoder.getValueCoder();
        this.inputValueValueCoder = inputValueCoder.getValueCoder();
        this.outputCoder = outputCoder;
        this.windowingStrategy = windowingStrategy;
        this.ownerId = ownerId;
        this.flatMapper = this.flatMapper(item -> {
            if (COMPLETE_MARKER == item) {
                long millis = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
                this.advanceWatermark(millis);
            } else if (TRY_PROCESS_MARKER == item) {
                Instant now = Instant.now();
                if (now.getMillis() - this.lastProcessingTime > 100L) {
                    this.lastProcessingTime = now.getMillis();
                    this.advanceProcessingTime(now);
                }
            } else if (item instanceof Watermark) {
                this.advanceWatermark(((Watermark)item).timestamp());
                this.appendableTraverser.append(item);
            } else {
                WindowedValue windowedValue = Utils.decodeWindowedValue((byte[])item, (Coder)inputCoder);
                KV kv = (KV)windowedValue.getValue();
                Object key = kv.getKey();
                Object value = kv.getValue();
                Utils.ByteArrayKey keyBytes = new Utils.ByteArrayKey(Utils.encode(key, inputValueCoder.getKeyCoder()));
                WindowedValue updatedWindowedValue = WindowedValue.of((Object)value, (Instant)windowedValue.getTimestamp(), (Collection)windowedValue.getWindows(), (PaneInfo)windowedValue.getPane());
                this.keyManagers.computeIfAbsent(keyBytes, x -> new KeyManager(key)).processElement(updatedWindowedValue);
            }
            return this.appendableTraverser;
        });
    }

    public static <K, V> SupplierEx<Processor> supplier(SerializablePipelineOptions pipelineOptions, WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder, Coder outputCoder, WindowingStrategy windowingStrategy, String ownerId) {
        return (SupplierEx & Serializable)() -> new WindowGroupP(pipelineOptions, inputCoder, outputCoder, windowingStrategy, ownerId);
    }

    public boolean tryProcess() {
        return this.flatMapper.tryProcess(TRY_PROCESS_MARKER);
    }

    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        return this.flatMapper.tryProcess(item);
    }

    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return this.flatMapper.tryProcess((Object)watermark);
    }

    public boolean complete() {
        return this.flatMapper.tryProcess(COMPLETE_MARKER);
    }

    private void advanceWatermark(long millis) {
        this.latestWatermark = new Instant(millis);
        Instant now = Instant.now();
        for (KeyManager m : this.keyManagers.values()) {
            m.advanceWatermark(this.latestWatermark, now);
        }
    }

    private void advanceProcessingTime(Instant now) {
        for (KeyManager m : this.keyManagers.values()) {
            m.advanceProcessingTime(now);
        }
    }

    private class KeyManager {
        private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
        private final InMemoryStateInternalsImpl stateInternals;
        private final ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner;

        KeyManager(K key) {
            this.stateInternals = new InMemoryStateInternalsImpl(key);
            this.reduceFnRunner = new ReduceFnRunner(key, WindowGroupP.this.windowingStrategy, ExecutableTriggerStateMachine.create((TriggerStateMachine)TriggerStateMachines.stateMachineForTrigger((RunnerApi.Trigger)TriggerTranslation.toProto((Trigger)WindowGroupP.this.windowingStrategy.getTrigger()))), (StateInternals)this.stateInternals, (TimerInternals)this.timerInternals, new OutputWindowedValue<KV<K, Iterable<V>>>(){

                public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                    WindowedValue windowedValue = WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane);
                    byte[] encodedValue = Utils.encode(windowedValue, WindowGroupP.this.outputCoder);
                    WindowGroupP.this.appendableTraverser.append((Object)encodedValue);
                }

                public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                    throw new UnsupportedOperationException("Grouping should not use side outputs");
                }
            }, (SideInputReader)NullSideInputReader.empty(), (ReduceFn)SystemReduceFn.buffering((Coder)WindowGroupP.this.inputValueValueCoder), WindowGroupP.this.pipelineOptions.get());
            this.advanceWatermark(WindowGroupP.this.latestWatermark, Instant.now());
        }

        void advanceWatermark(Instant watermark, Instant now) {
            try {
                this.timerInternals.advanceProcessingTime(now);
                this.advanceInputWatermark(watermark);
                Instant hold = this.stateInternals.earliestWatermarkHold();
                if (hold == null) {
                    WindowTracing.trace((String)"TestInMemoryTimerInternals.advanceInputWatermark: no holds, so output watermark = input watermark", (Object[])new Object[0]);
                    hold = this.timerInternals.currentInputWatermarkTime();
                }
                this.advanceOutputWatermark(hold);
                this.reduceFnRunner.persist();
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow((Throwable)e);
            }
        }

        void advanceProcessingTime(Instant now) {
            try {
                this.timerInternals.advanceProcessingTime(now);
                this.reduceFnRunner.persist();
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow((Throwable)e);
            }
        }

        private void advanceInputWatermark(Instant watermark) throws Exception {
            this.timerInternals.advanceInputWatermark(watermark);
            while (true) {
                TimerInternals.TimerData timer;
                ArrayList<TimerInternals.TimerData> timers = new ArrayList<TimerInternals.TimerData>();
                while ((timer = this.timerInternals.removeNextEventTimer()) != null) {
                    timers.add(timer);
                }
                if (timers.isEmpty()) break;
                this.reduceFnRunner.onTimers(timers);
            }
        }

        private void advanceOutputWatermark(Instant watermark) {
            Objects.requireNonNull(watermark);
            this.timerInternals.advanceOutputWatermark(watermark);
        }

        public void processElement(WindowedValue<V> windowedValue) {
            Collection<? extends BoundedWindow> windows = this.dropLateWindows(windowedValue.getWindows());
            if (!windows.isEmpty()) {
                try {
                    this.reduceFnRunner.processElements(Collections.singletonList(windowedValue));
                    this.reduceFnRunner.persist();
                }
                catch (Exception e) {
                    throw ExceptionUtil.rethrow((Throwable)e);
                }
            }
        }

        private Collection<? extends BoundedWindow> dropLateWindows(Collection<? extends BoundedWindow> windows) {
            boolean hasExpired = false;
            Iterator<? extends BoundedWindow> iterator = windows.iterator();
            while (!hasExpired && iterator.hasNext()) {
                if (!this.isExpiredWindow(iterator.next())) continue;
                hasExpired = true;
            }
            if (!hasExpired) {
                return windows;
            }
            return windows.stream().filter(window -> !this.isExpiredWindow((BoundedWindow)window)).collect(Collectors.toList());
        }

        private boolean isExpiredWindow(BoundedWindow window) {
            Instant inputWM = this.timerInternals.currentInputWatermarkTime();
            return LateDataUtils.garbageCollectionTime((BoundedWindow)window, (WindowingStrategy)WindowGroupP.this.windowingStrategy).isBefore((ReadableInstant)inputWM);
        }
    }

    private static class InMemoryStateInternalsImpl
    extends InMemoryStateInternals {
        InMemoryStateInternalsImpl(@Nullable Object key) {
            super(key);
        }

        Instant earliestWatermarkHold() {
            Instant minimum = null;
            for (State storage : this.inMemoryState.values()) {
                if (!(storage instanceof WatermarkHoldState)) continue;
                Instant hold = (Instant)((WatermarkHoldState)storage).read();
                if (minimum != null && (hold == null || !hold.isBefore((ReadableInstant)minimum))) continue;
                minimum = hold;
            }
            return minimum;
        }
    }
}

