/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.stream.sample;

import com.datatorrent.lib.util.KeyValPair;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.Option;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
import org.apache.apex.malhar.stream.sample.MyStream;
import org.apache.apex.malhar.stream.sample.TupleCollector;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;

public class MyStreamTest {
    static Map<String, Long> expected = new HashMap<String, Long>();
    static String testId = null;
    static Callable<Boolean> exitCondition = null;

    @Test
    public void testMethodChainWordcount() throws Exception {
        testId = "testMethodChainWordcount";
        TupleCollector collector = new TupleCollector();
        collector.id = testId;
        ((MyStream)new MyStream((ApexStreamImpl)StreamFactory.fromFolder((String)"./src/test/resources/data")).flatMap((Function.FlatMapFunction)new Function.FlatMapFunction<String, String>(){

            public Iterable<String> f(String input) {
                return Arrays.asList(input.split(" "));
            }
        }, new Option[0])).myFilterAndMap(new Function.MapFunction<String, String>(){

            public String f(String input) {
                return input.replace("word", "newword");
            }
        }, new Function.FilterFunction<String>(){

            public boolean f(String input) {
                return input.startsWith("word");
            }
        }).window((WindowOption)new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis((long)1000L))).countByKey((Function.ToKeyValue)new Function.ToKeyValue<String, String, Long>(){

            public Tuple<KeyValPair<String, Long>> f(String input) {
                return new Tuple.PlainTuple((Object)new KeyValPair((Object)input, (Object)1L));
            }
        }, new Option[0]).addOperator(collector, collector.inputPort, null, new Option[0]).runEmbedded(false, 30000L, exitCondition);
        HashMap<Object, Object> dataMap = new HashMap<Object, Object>();
        for (Tuple.TimestampedTuple entry : TupleCollector.results.get(testId)) {
            dataMap.put(((KeyValPair)entry.getValue()).getKey(), ((KeyValPair)entry.getValue()).getValue());
        }
        Assert.assertTrue((dataMap.size() > 1 ? 1 : 0) != 0);
        Assert.assertEquals(expected, dataMap);
    }

    @Test
    public void testNonMethodChainWordcount() throws Exception {
        testId = "testNonMethodChainWordcount";
        TupleCollector collector = new TupleCollector();
        collector.id = testId;
        MyStream mystream = (MyStream)new MyStream((ApexStreamImpl)StreamFactory.fromFolder((String)"./src/test/resources/data")).flatMap((Function.FlatMapFunction)new Function.FlatMapFunction<String, String>(){

            public Iterable<String> f(String input) {
                return Arrays.asList(input.split(" "));
            }
        }, new Option[0]);
        mystream.myFilterAndMap(new Function.MapFunction<String, String>(){

            public String f(String input) {
                return input.replace("word", "newword");
            }
        }, new Function.FilterFunction<String>(){

            public boolean f(String input) {
                return input.startsWith("word");
            }
        }).window((WindowOption)new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis((long)1000L))).countByKey((Function.ToKeyValue)new Function.ToKeyValue<String, String, Long>(){

            public Tuple<KeyValPair<String, Long>> f(String input) {
                return new Tuple.PlainTuple((Object)new KeyValPair((Object)input, (Object)1L));
            }
        }, new Option[0]).addOperator(collector, collector.inputPort, collector.outputPort, new Option[0]).runEmbedded(false, 30000L, exitCondition);
        HashMap<Object, Object> dataMap = new HashMap<Object, Object>();
        for (Tuple.TimestampedTuple entry : TupleCollector.results.get(testId)) {
            dataMap.put(((KeyValPair)entry.getValue()).getKey(), ((KeyValPair)entry.getValue()).getValue());
        }
        Assert.assertTrue((dataMap.size() > 1 ? 1 : 0) != 0);
        Assert.assertEquals(expected, dataMap);
    }

    static {
        expected.put("newword1", 4L);
        expected.put("newword2", 8L);
        expected.put("newword3", 4L);
        expected.put("newword4", 4L);
        expected.put("newword5", 4L);
        expected.put("newword7", 4L);
        expected.put("newword9", 6L);
        exitCondition = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                if (!TupleCollector.results.containsKey(testId) || TupleCollector.results.get(testId).isEmpty()) {
                    return false;
                }
                HashMap<Object, Object> dataMap = new HashMap<Object, Object>();
                List<?> data = TupleCollector.results.get(testId);
                for (Tuple.TimestampedTuple entry : data) {
                    dataMap.put(((KeyValPair)entry.getValue()).getKey(), ((KeyValPair)entry.getValue()).getValue());
                }
                return dataMap != null && dataMap.size() >= 1 && expected.equals(dataMap);
            }
        };
    }
}

