/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.stream.sample;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
import org.apache.apex.malhar.stream.sample.MyStream;
import org.apache.apex.malhar.stream.sample.TupleCollector;
import org.junit.Assert;
import org.junit.Test;

public class MyStreamTest {
    static Map<Object, Integer> expected = new HashMap<Object, Integer>();
    static String testId = null;
    static Callable<Boolean> exitCondition = null;

    @Test
    public void testMethodChainWordcount() throws Exception {
        testId = "testMethodChainWordcount";
        TupleCollector collector = new TupleCollector();
        collector.id = testId;
        ((MyStream)new MyStream(StreamFactory.fromFolder((String)"./src/test/resources/data")).flatMap((Function.FlatMapFunction)new Function.FlatMapFunction<String, String>(){

            public Iterable<String> f(String input) {
                return Arrays.asList(input.split(" "));
            }
        })).myFilterAndMap(new Function.MapFunction<String, String>(){

            public String f(String input) {
                return input.replace("word", "newword");
            }
        }, new Function.FilterFunction<String>(){

            public Boolean f(String input) {
                return input.startsWith("word");
            }
        }).countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000L, exitCondition);
        List<?> data = TupleCollector.results.get(testId);
        Assert.assertTrue((data.size() > 1 ? 1 : 0) != 0);
        Assert.assertEquals(expected, data.get(data.size() - 1));
    }

    @Test
    public void testNonMethodChainWordcount() throws Exception {
        testId = "testNonMethodChainWordcount";
        TupleCollector collector = new TupleCollector();
        collector.id = testId;
        MyStream mystream = (MyStream)new MyStream(StreamFactory.fromFolder((String)"./src/test/resources/data")).flatMap((Function.FlatMapFunction)new Function.FlatMapFunction<String, String>(){

            public Iterable<String> f(String input) {
                return Arrays.asList(input.split(" "));
            }
        });
        mystream.myFilterAndMap(new Function.MapFunction<String, String>(){

            public String f(String input) {
                return input.replace("word", "newword");
            }
        }, new Function.FilterFunction<String>(){

            public Boolean f(String input) {
                return input.startsWith("word");
            }
        }).countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000L, exitCondition);
        List<?> data = TupleCollector.results.get(testId);
        Assert.assertTrue((data.size() > 1 ? 1 : 0) != 0);
        Assert.assertEquals(expected, data.get(data.size() - 1));
    }

    static {
        expected.put("newword1", 4);
        expected.put("newword2", 8);
        expected.put("newword3", 4);
        expected.put("newword4", 4);
        expected.put("newword5", 4);
        expected.put("newword7", 4);
        expected.put("newword9", 6);
        exitCondition = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                List<?> data = TupleCollector.results.get(testId);
                return data != null && data.size() >= 1 && expected.equals(data.get(data.size() - 1));
            }
        };
    }
}

