/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.stream.FunctionOperator;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.Option;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
import org.junit.Assert;
import org.junit.Test;

public class FunctionOperatorTest {
    private static final int NumTuples = 10;
    private static final int NumFlatMapTuples = 100;
    private static final int divider = 2;
    private static final int listSize = 10;
    private static int TupleCount;
    private static int sum;

    @Test
    public void testMapOperator() throws Exception {
        LocalMode lma = LocalMode.newInstance();
        DAG dag = lma.getDAG();
        NumberGenerator numGen = (NumberGenerator)dag.addOperator("numGen", (Operator)new NumberGenerator());
        FunctionOperator.MapFunctionOperator mapper = (FunctionOperator.MapFunctionOperator)dag.addOperator("mapper", (Operator)new FunctionOperator.MapFunctionOperator((Function.MapFunction)new Square()));
        ResultCollector collector = (ResultCollector)dag.addOperator("collector", (Operator)new ResultCollector());
        dag.addStream("raw numbers", numGen.output, (Operator.InputPort)mapper.input);
        dag.addStream("mapped results", (Operator.OutputPort)mapper.output, collector.input);
        LocalMode.Controller lc = lma.getController();
        lc.setHeartbeatMonitoringEnabled(false);
        ((StramLocalCluster)lc).setExitCondition((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return TupleCount == 10;
            }
        });
        lc.run(5000L);
        Assert.assertEquals((long)sum, (long)285L);
    }

    @Test
    public void testMapOperatorStream() throws Exception {
        NumberGenerator numGen = new NumberGenerator();
        ResultCollector collector = new ResultCollector();
        ApexStream nums = StreamFactory.fromInput((InputOperator)numGen, numGen.output, (Option[])new Option[0]).map((Function.MapFunction)new Square(), new Option[0]);
        nums.addOperator((Operator)collector, collector.input, null, new Option[0]).runEmbedded(false, 10000L, (Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return TupleCount == 10;
            }
        });
        Assert.assertEquals((long)sum, (long)285L);
    }

    @Test
    public void testFlatMapOperator() throws Exception {
        LocalMode lma = LocalMode.newInstance();
        DAG dag = lma.getDAG();
        NumberListGenerator numGen = (NumberListGenerator)dag.addOperator("numGen", (Operator)new NumberListGenerator());
        FunctionOperator.FlatMapFunctionOperator fm = (FunctionOperator.FlatMapFunctionOperator)dag.addOperator("flatmap", (Operator)new FunctionOperator.FlatMapFunctionOperator((Function.FlatMapFunction)new FmFunction()));
        ResultCollector collector = (ResultCollector)dag.addOperator("collector", (Operator)new ResultCollector());
        dag.addStream("raw numbers", numGen.output, (Operator.InputPort)fm.input);
        dag.addStream("flatmap results", (Operator.OutputPort)fm.output, collector.input);
        LocalMode.Controller lc = lma.getController();
        lc.setHeartbeatMonitoringEnabled(false);
        ((StramLocalCluster)lc).setExitCondition((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return TupleCount == 13;
            }
        });
        lc.run(5000L);
        Assert.assertEquals((long)sum, (long)39555L);
    }

    @Test
    public void testFlatMapOperatorStream() throws Exception {
        NumberListGenerator numGen = new NumberListGenerator();
        ResultCollector collector = new ResultCollector();
        ApexStream numLists = StreamFactory.fromInput((InputOperator)numGen, numGen.output, (Option[])new Option[0]).flatMap((Function.FlatMapFunction)new FmFunction(), new Option[0]);
        numLists.addOperator((Operator)collector, collector.input, null, new Option[0]).runEmbedded(false, 10000L, (Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return TupleCount == 13;
            }
        });
        Assert.assertEquals((long)sum, (long)39555L);
    }

    @Test
    public void testFilterOperator() throws Exception {
        LocalMode lma = LocalMode.newInstance();
        DAG dag = lma.getDAG();
        FunctionOperator.FilterFunctionOperator filter0 = new FunctionOperator.FilterFunctionOperator((Function.FilterFunction)new Function.FilterFunction<Integer>(){

            public boolean f(Integer in) {
                return in % 2 == 0;
            }
        });
        NumberGenerator numGen = (NumberGenerator)dag.addOperator("numGen", (Operator)new NumberGenerator());
        FunctionOperator.FilterFunctionOperator filter = (FunctionOperator.FilterFunctionOperator)dag.addOperator("filter", (Operator)filter0);
        ResultCollector collector = (ResultCollector)dag.addOperator("collector", (Operator)new ResultCollector());
        dag.addStream("raw numbers", numGen.output, (Operator.InputPort)filter.input);
        dag.addStream("filtered results", (Operator.OutputPort)filter.output, collector.input);
        LocalMode.Controller lc = lma.getController();
        lc.setHeartbeatMonitoringEnabled(false);
        ((StramLocalCluster)lc).setExitCondition((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return TupleCount == 5;
            }
        });
        lc.run(5000L);
        Assert.assertEquals((long)sum, (long)20L);
    }

    @Test
    public void testFilterOperatorStream() throws Exception {
        NumberGenerator numGen = new NumberGenerator();
        ResultCollector collector = new ResultCollector();
        ApexStream nums = StreamFactory.fromInput((InputOperator)numGen, numGen.output, (Option[])new Option[0]).filter((Function.FilterFunction)new Function.FilterFunction<Integer>(){

            public boolean f(Integer in) {
                return in % 2 == 0;
            }
        }, new Option[0]);
        nums.addOperator((Operator)collector, collector.input, null, new Option[0]).runEmbedded(false, 10000L, (Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return TupleCount == 5;
            }
        });
        Assert.assertEquals((long)sum, (long)20L);
    }

    public static class Square
    implements Function.MapFunction<Integer, Integer> {
        public Integer f(Integer input) {
            return input * input;
        }
    }

    public static class FmFunction
    implements Function.FlatMapFunction<List<Integer>, Integer> {
        public Iterable<Integer> f(List<Integer> input) {
            ArrayList<Integer> result = new ArrayList<Integer>();
            for (int in : input) {
                if (in % 13 != 0 && in % 17 != 0) continue;
                result.add(in * in);
            }
            return result;
        }
    }

    public static class ResultCollector
    extends BaseOperator {
        public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>(){

            public void process(Integer in) {
                TupleCount++;
                sum += in;
            }
        };

        public void setup(Context.OperatorContext context) {
            TupleCount = 0;
            sum = 0;
        }
    }

    public static class NumberGenerator
    extends BaseOperator
    implements InputOperator {
        private int num;
        public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort();

        public void setup(Context.OperatorContext context) {
            this.num = 0;
        }

        public void emitTuples() {
            if (this.num < 10) {
                this.output.emit((Object)this.num);
                ++this.num;
            }
        }
    }

    public static class NumberListGenerator
    extends BaseOperator
    implements InputOperator {
        private int numMem;
        private List<Integer> nums;
        public final transient DefaultOutputPort<List<Integer>> output = new DefaultOutputPort();

        public void setup(Context.OperatorContext context) {
            this.numMem = 0;
            this.nums = new ArrayList<Integer>();
        }

        public void emitTuples() {
            this.nums.add(this.numMem);
            ++this.numMem;
            if (this.numMem < 100 && this.nums.size() < 10) {
                this.output.emit(this.nums);
                this.nums.clear();
            }
        }
    }
}

