/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.gearpump.translators;

import io.gearpump.streaming.dsl.api.functions.FoldFunction;
import io.gearpump.streaming.dsl.api.functions.MapFunction;
import io.gearpump.streaming.dsl.javaapi.JavaStream;
import io.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import io.gearpump.streaming.dsl.window.api.AccumulationMode;
import io.gearpump.streaming.dsl.window.api.Discarding$;
import io.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import io.gearpump.streaming.dsl.window.api.Trigger;
import io.gearpump.streaming.dsl.window.api.WindowFunction;
import io.gearpump.streaming.dsl.window.api.Windows;
import io.gearpump.streaming.dsl.window.impl.Window;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.beam.runners.gearpump.translators.TransformTranslator;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;

public class GroupByKeyTranslator<K, V>
implements TransformTranslator<GroupByKey<K, V>> {
    private static final long serialVersionUID = -8742202583992787659L;

    @Override
    public void translate(GroupByKey<K, V> transform, TranslationContext context) {
        PCollection input = (PCollection)context.getInput();
        Coder inputKeyCoder = ((KvCoder)input.getCoder()).getKeyCoder();
        JavaStream inputStream = context.getInputStream((PValue)input);
        int parallelism = context.getPipelineOptions().getParallelism();
        TimestampCombiner timestampCombiner = input.getWindowingStrategy().getTimestampCombiner();
        WindowFn windowFn = input.getWindowingStrategy().getWindowFn();
        JavaStream outputStream = inputStream.window(Windows.apply(new GearpumpWindowFn(windowFn.isNonMerging()), (Trigger)EventTimeTrigger$.MODULE$, (AccumulationMode)Discarding$.MODULE$, (String)windowFn.toString())).groupBy(new GroupByFn(inputKeyCoder), parallelism, "group_by_Key_and_Window").map(new KeyedByTimestamp(windowFn, timestampCombiner), "keyed_by_timestamp").fold(new Merge(windowFn, timestampCombiner), "merge").map(new Values(), "values");
        context.setOutputStream(context.getOutput(), outputStream);
    }

    private static class Values<K, V>
    extends MapFunction<KV<Instant, WindowedValue<KV<K, List<V>>>>, WindowedValue<KV<K, List<V>>>> {
        private Values() {
        }

        public WindowedValue<KV<K, List<V>>> map(KV<Instant, WindowedValue<KV<K, List<V>>>> kv) {
            Instant timestamp = (Instant)kv.getKey();
            WindowedValue wv = (WindowedValue)kv.getValue();
            return WindowedValue.of((Object)((KV)wv.getValue()), (Instant)timestamp, (Collection)wv.getWindows(), (PaneInfo)wv.getPane());
        }
    }

    protected static class Merge<K, V>
    extends FoldFunction<KV<Instant, WindowedValue<KV<K, V>>>, KV<Instant, WindowedValue<KV<K, List<V>>>>> {
        private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
        private final TimestampCombiner timestampCombiner;

        Merge(WindowFn<KV<K, V>, BoundedWindow> windowFn, TimestampCombiner timestampCombiner) {
            this.windowFn = windowFn;
            this.timestampCombiner = timestampCombiner;
        }

        public KV<Instant, WindowedValue<KV<K, List<V>>>> init() {
            return KV.of(null, null);
        }

        public KV<Instant, WindowedValue<KV<K, List<V>>>> fold(KV<Instant, WindowedValue<KV<K, List<V>>>> accum, KV<Instant, WindowedValue<KV<K, V>>> iter) {
            if (accum.getKey() == null) {
                WindowedValue wv = (WindowedValue)iter.getValue();
                KV kv = (KV)wv.getValue();
                Object v = kv.getValue();
                ArrayList nv = Lists.newArrayList((Object[])new Object[]{v});
                return KV.of((Object)((Instant)iter.getKey()), (Object)wv.withValue((Object)KV.of((Object)kv.getKey(), (Object)nv)));
            }
            Instant t1 = (Instant)accum.getKey();
            Instant t2 = (Instant)iter.getKey();
            final WindowedValue wv1 = (WindowedValue)accum.getValue();
            final WindowedValue wv2 = (WindowedValue)iter.getValue();
            ((List)((KV)wv1.getValue()).getValue()).add(((KV)wv2.getValue()).getValue());
            final ArrayList mergedWindows = new ArrayList();
            if (!this.windowFn.isNonMerging()) {
                try {
                    WindowFn<KV<K, V>, BoundedWindow> windowFn = this.windowFn;
                    Objects.requireNonNull(windowFn);
                    this.windowFn.mergeWindows(new WindowFn.MergeContext(windowFn){

                        public Collection<BoundedWindow> windows() {
                            ArrayList<BoundedWindow> windows = new ArrayList<BoundedWindow>();
                            windows.addAll(wv1.getWindows());
                            windows.addAll(wv2.getWindows());
                            return windows;
                        }

                        public void merge(Collection<BoundedWindow> toBeMerged, BoundedWindow mergeResult) throws Exception {
                            mergedWindows.add(mergeResult);
                        }
                    });
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } else {
                mergedWindows.addAll(wv1.getWindows());
            }
            Instant timestamp = this.timestampCombiner.combine(new Instant[]{t1, t2});
            return KV.of((Object)timestamp, (Object)WindowedValue.of((Object)((KV)wv1.getValue()), (Instant)timestamp, mergedWindows, (PaneInfo)wv1.getPane()));
        }
    }

    protected static class KeyedByTimestamp<K, V>
    extends MapFunction<WindowedValue<KV<K, V>>, KV<Instant, WindowedValue<KV<K, V>>>> {
        private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
        private final TimestampCombiner timestampCombiner;

        public KeyedByTimestamp(WindowFn<KV<K, V>, BoundedWindow> windowFn, TimestampCombiner timestampCombiner) {
            this.windowFn = windowFn;
            this.timestampCombiner = timestampCombiner;
        }

        public KV<Instant, WindowedValue<KV<K, V>>> map(WindowedValue<KV<K, V>> wv) {
            BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)wv.getWindows());
            Instant timestamp = this.timestampCombiner.assign(window, this.windowFn.getOutputTime(wv.getTimestamp(), window));
            return KV.of((Object)timestamp, wv);
        }
    }

    protected static class GroupByFn<K, V>
    extends GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
        private static final long serialVersionUID = -807905402490735530L;
        private final Coder<K> keyCoder;

        GroupByFn(Coder<K> keyCoder) {
            this.keyCoder = keyCoder;
        }

        public ByteBuffer groupBy(WindowedValue<KV<K, V>> wv) {
            try {
                return ByteBuffer.wrap(CoderUtils.encodeToByteArray(this.keyCoder, (Object)((KV)wv.getValue()).getKey()));
            }
            catch (CoderException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected static class GearpumpWindowFn<T, W extends BoundedWindow>
    implements WindowFunction,
    Serializable {
        private final boolean isNonMerging;

        public GearpumpWindowFn(boolean isNonMerging) {
            this.isNonMerging = isNonMerging;
        }

        public <T2> Window[] apply(WindowFunction.Context<T2> context) {
            try {
                Object element = context.element();
                if (element instanceof TranslatorUtils.RawUnionValue) {
                    element = ((TranslatorUtils.RawUnionValue)element).getValue();
                }
                return this.toGearpumpWindows(((WindowedValue)element).getWindows().toArray(new BoundedWindow[0]));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public boolean isNonMerging() {
            return this.isNonMerging;
        }

        private Window[] toGearpumpWindows(BoundedWindow[] windows) {
            Window[] gwins = new Window[windows.length];
            for (int i = 0; i < windows.length; ++i) {
                gwins[i] = TranslatorUtils.boundedWindowToGearpumpWindow(windows[i]);
            }
            return gwins;
        }
    }
}

