/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.cookbook;

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.examples.cookbook.TriggerExample;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class TriggerExampleTest {
    private static final String[] INPUT = new String[]{"01/01/2010 00:00:00,1108302,94,E,ML,36,100,29,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", "01/01/2010 00:00:00,1100333,5,N,FR,9,0,39,,,9,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,"};
    private static final List<TimestampedValue<String>> TIME_STAMPED_INPUT = Arrays.asList(TimestampedValue.of((Object)"01/01/2010 00:00:00,1108302,5,W,ML,36,100,30,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,87.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", (Instant)new Instant(60000L)), TimestampedValue.of((Object)"01/01/2010 00:00:00,1108302,110,E,ML,36,100,40,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", (Instant)new Instant(1L)), TimestampedValue.of((Object)"01/01/2010 00:00:00,1108302,110,E,ML,36,100,50,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0,,,,,0,,,,,0", (Instant)new Instant(1L)));
    private static final TableRow OUT_ROW_1 = new TableRow().set("trigger_type", (Object)"default").set("freeway", (Object)"5").set("total_flow", (Object)30).set("number_of_records", (Object)1).set("isFirst", (Object)true).set("isLast", (Object)true).set("timing", (Object)"ON_TIME").set("window", (Object)"[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
    private static final TableRow OUT_ROW_2 = new TableRow().set("trigger_type", (Object)"default").set("freeway", (Object)"110").set("total_flow", (Object)90).set("number_of_records", (Object)2).set("isFirst", (Object)true).set("isLast", (Object)true).set("timing", (Object)"ON_TIME").set("window", (Object)"[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");

    @Test
    public void testExtractTotalFlow() {
        DoFnTester extractFlowInfow = DoFnTester.of((DoFn)new TriggerExample.ExtractFlowInfo());
        List results = extractFlowInfow.processBatch((Object[])INPUT);
        Assert.assertEquals((long)results.size(), (long)1L);
        Assert.assertEquals((Object)((KV)results.get(0)).getKey(), (Object)"94");
        Assert.assertEquals((Object)((KV)results.get(0)).getValue(), (Object)new Integer(29));
        List output = extractFlowInfow.processBatch((Object[])new String[]{""});
        Assert.assertEquals((long)output.size(), (long)0L);
    }

    @Test
    @Category(value={RunnableOnService.class})
    public void testTotalFlow() {
        TestPipeline pipeline = TestPipeline.create();
        PCollection flow = (PCollection)((PCollection)pipeline.apply((PTransform)Create.timestamped(TIME_STAMPED_INPUT))).apply((PTransform)ParDo.of((DoFn)new TriggerExample.ExtractFlowInfo()));
        PCollection totalFlow = (PCollection)((PCollection)flow.apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))))).apply((PTransform)new TriggerExample.TotalFlow("default"));
        PCollection results = (PCollection)totalFlow.apply((PTransform)ParDo.of((DoFn)new FormatResults()));
        PAssert.that((PCollection)results).containsInAnyOrder((Object[])new TableRow[]{OUT_ROW_1, OUT_ROW_2});
        pipeline.run();
    }

    static class FormatResults
    extends DoFn<TableRow, TableRow> {
        FormatResults() {
        }

        public void processElement(DoFn.ProcessContext c) throws Exception {
            TableRow element = (TableRow)c.element();
            TableRow row = new TableRow().set("trigger_type", element.get((Object)"trigger_type")).set("freeway", element.get((Object)"freeway")).set("total_flow", element.get((Object)"total_flow")).set("number_of_records", element.get((Object)"number_of_records")).set("isFirst", element.get((Object)"isFirst")).set("isLast", element.get((Object)"isLast")).set("timing", element.get((Object)"timing")).set("window", element.get((Object)"window"));
            c.output((Object)row);
        }
    }
}

