/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.stream.api.impl;

import com.datatorrent.api.Operator;
import com.datatorrent.lib.util.KeyValPair;
import java.util.List;
import org.apache.apex.malhar.lib.function.Function;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
import org.apache.apex.malhar.lib.window.accumulation.SumLong;
import org.apache.apex.malhar.lib.window.accumulation.TopN;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.Option;
import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
import org.apache.apex.malhar.stream.api.impl.DagMeta;
import org.apache.hadoop.classification.InterfaceStability;
import org.joda.time.Duration;

@InterfaceStability.Evolving
public class ApexWindowedStreamImpl<T>
extends ApexStreamImpl<T>
implements WindowedStream<T> {
    protected WindowOption windowOption;
    protected TriggerOption triggerOption;
    protected Duration allowedLateness;

    @Override
    public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option ... opts) {
        Function.MapFunction kVMap = new Function.MapFunction<T, Tuple<Long>>(){

            public Tuple<Long> f(T input) {
                if (input instanceof Tuple.TimestampedTuple) {
                    return new Tuple.TimestampedTuple(((Tuple.TimestampedTuple)input).getTimestamp(), (Object)1L);
                }
                return new Tuple.TimestampedTuple(System.currentTimeMillis(), (Object)1L);
            }
        };
        WindowedStream innerstream = (WindowedStream)this.map(kVMap, new Option[0]);
        WindowedOperatorImpl windowedOperator = this.createWindowedOperator((Accumulation)new SumLong());
        return (STREAM)((WindowedStream)innerstream.addOperator((Operator)windowedOperator, windowedOperator.input, windowedOperator.output, opts));
    }

    @Override
    public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option ... opts) {
        WindowedStream kvstream = (WindowedStream)this.map(convertToKeyValue, new Option[0]);
        KeyedWindowedOperatorImpl keyedWindowedOperator = this.createKeyedWindowedOperator((Accumulation)new SumLong());
        return (STREAM)((WindowedStream)kvstream.addOperator((Operator)keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts));
    }

    @Override
    public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int N, Function.ToKeyValue<T, K, V> convertToKeyVal, Option ... opts) {
        TopN top = new TopN();
        top.setN(N);
        WindowedStream kvstream = (WindowedStream)this.map(convertToKeyVal, new Option[0]);
        KeyedWindowedOperatorImpl keyedWindowedOperator = this.createKeyedWindowedOperator((Accumulation)top);
        return (STREAM)((WindowedStream)kvstream.addOperator((Operator)keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts));
    }

    @Override
    public <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option ... opts) {
        TopN top = new TopN();
        top.setN(N);
        WindowedStream innerstream = (WindowedStream)this.map(new ConvertFn(), new Option[0]);
        WindowedOperatorImpl windowedOperator = this.createWindowedOperator((Accumulation)top);
        return (STREAM)((WindowedStream)innerstream.addOperator((Operator)windowedOperator, windowedOperator.input, windowedOperator.output, opts));
    }

    @Override
    public <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation, Function.ToKeyValue<T, K, V> convertToKeyVal, Option ... opts) {
        WindowedStream kvstream = (WindowedStream)this.map(convertToKeyVal, new Option[0]);
        KeyedWindowedOperatorImpl<K, V, ACCU, O> keyedWindowedOperator = this.createKeyedWindowedOperator(accumulation);
        return (STREAM)((WindowedStream)kvstream.addOperator((Operator)keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts));
    }

    @Override
    public <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option ... opts) {
        WindowedStream innerstream = (WindowedStream)this.map(new ConvertFn(), new Option[0]);
        WindowedOperatorImpl<T, ACCU, O> windowedOperator = this.createWindowedOperator(accumulation);
        return (STREAM)((WindowedStream)innerstream.addOperator((Operator)windowedOperator, windowedOperator.input, windowedOperator.output, opts));
    }

    @Override
    public <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduce, Option ... opts) {
        WindowedStream innerstream = (WindowedStream)this.map(new ConvertFn(), new Option[0]);
        WindowedOperatorImpl windowedOperator = this.createWindowedOperator((Accumulation)reduce);
        return (STREAM)((WindowedStream)innerstream.addOperator((Operator)windowedOperator, windowedOperator.input, windowedOperator.output, opts));
    }

    @Override
    public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduce, Function.ToKeyValue<T, K, V> convertToKeyVal, Option ... opts) {
        WindowedStream kvstream = (WindowedStream)this.map(convertToKeyVal, new Option[0]);
        KeyedWindowedOperatorImpl keyedWindowedOperator = this.createKeyedWindowedOperator((Accumulation)reduce);
        return (STREAM)((WindowedStream)kvstream.addOperator((Operator)keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts));
    }

    @Override
    public <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> fold, Option ... opts) {
        WindowedStream innerstream = (WindowedStream)this.map(new ConvertFn(), new Option[0]);
        WindowedOperatorImpl windowedOperator = this.createWindowedOperator((Accumulation)fold);
        return (STREAM)((WindowedStream)innerstream.addOperator((Operator)windowedOperator, windowedOperator.input, windowedOperator.output, opts));
    }

    @Override
    public <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> fold, Function.ToKeyValue<T, K, V> convertToKeyVal, Option ... opts) {
        WindowedStream kvstream = (WindowedStream)this.map(convertToKeyVal, new Option[0]);
        KeyedWindowedOperatorImpl keyedWindowedOperator = this.createKeyedWindowedOperator((Accumulation)fold);
        return (STREAM)((WindowedStream)kvstream.addOperator((Operator)keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts));
    }

    @Override
    public <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> convertToKeyVal, Option ... opts) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <STREAM extends WindowedStream<Iterable<T>>> STREAM group() {
        throw new UnsupportedOperationException();
    }

    @Override
    public <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption option) {
        this.triggerOption = option;
        return (STREAM)this;
    }

    @Override
    public <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration allowedLateness) {
        this.allowedLateness = allowedLateness;
        return (STREAM)this;
    }

    @Override
    protected <O> ApexStream<O> newStream(DagMeta graph, ApexStreamImpl.Brick<O> newBrick) {
        ApexWindowedStreamImpl<T> newstream = new ApexWindowedStreamImpl<T>();
        newstream.graph = graph;
        newstream.lastBrick = newBrick;
        newstream.windowOption = this.windowOption;
        newstream.triggerOption = this.triggerOption;
        newstream.allowedLateness = this.allowedLateness;
        return newstream;
    }

    private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<? super IN, ACCU, OUT> accumulationFn) {
        WindowedOperatorImpl windowedOperator = new WindowedOperatorImpl();
        windowedOperator.setDataStorage((WindowedStorage)new InMemoryWindowedStorage());
        windowedOperator.setRetractionStorage((WindowedStorage)new InMemoryWindowedStorage());
        windowedOperator.setWindowStateStorage((WindowedStorage.WindowedPlainStorage)new InMemoryWindowedStorage());
        if (this.windowOption != null) {
            windowedOperator.setWindowOption(this.windowOption);
        }
        if (this.triggerOption != null) {
            windowedOperator.setTriggerOption(this.triggerOption);
        }
        if (this.allowedLateness != null) {
            windowedOperator.setAllowedLateness(this.allowedLateness);
        }
        windowedOperator.setAccumulation(accumulationFn);
        return windowedOperator;
    }

    private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<? super V, ACCU, OUT> accumulationFn) {
        KeyedWindowedOperatorImpl keyedWindowedOperator = new KeyedWindowedOperatorImpl();
        keyedWindowedOperator.setDataStorage((WindowedStorage)new InMemoryWindowedKeyedStorage());
        keyedWindowedOperator.setRetractionStorage((WindowedStorage)new InMemoryWindowedKeyedStorage());
        keyedWindowedOperator.setWindowStateStorage((WindowedStorage.WindowedPlainStorage)new InMemoryWindowedStorage());
        if (this.windowOption != null) {
            keyedWindowedOperator.setWindowOption(this.windowOption);
        }
        if (this.triggerOption != null) {
            keyedWindowedOperator.setTriggerOption(this.triggerOption);
        }
        if (this.allowedLateness != null) {
            keyedWindowedOperator.setAllowedLateness(this.allowedLateness);
        }
        keyedWindowedOperator.setAccumulation(accumulationFn);
        return keyedWindowedOperator;
    }

    private static class ConvertFn<T>
    implements Function.MapFunction<T, Tuple<T>> {
        private ConvertFn() {
        }

        public Tuple<T> f(T input) {
            if (input instanceof Tuple.TimestampedTuple) {
                return (Tuple.TimestampedTuple)input;
            }
            return new Tuple.TimestampedTuple(System.currentTimeMillis(), input);
        }
    }
}

