/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.common.DataflowExampleOptions;
import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class TrafficRoutes {
    private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
    private static final Integer VALID_INPUTS = 4999;
    static Map<String, String> sdStations = TrafficRoutes.buildStationInfo();
    static final int WINDOW_DURATION = 3;
    static final int WINDOW_SLIDE_EVERY = 1;

    public static void main(String[] args) throws IOException {
        TrafficRoutesOptions options = (TrafficRoutesOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(TrafficRoutesOptions.class);
        options.setBigQuerySchema(FormatStatsFn.getSchema());
        DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        TableReference tableRef = new TableReference();
        tableRef.setProjectId(options.getProject());
        tableRef.setDatasetId(options.getBigQueryDataset());
        tableRef.setTableId(options.getBigQueryTable());
        PCollection input = options.isUnbounded() ? (PCollection)pipeline.apply((PTransform)PubsubIO.Read.timestampLabel((String)PUBSUB_TIMESTAMP_LABEL_KEY).subscription(options.getPubsubSubscription())) : (PCollection)pipeline.apply((PTransform)PubsubIO.Read.timestampLabel((String)PUBSUB_TIMESTAMP_LABEL_KEY).subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS.intValue()));
        ((PCollection)((PCollection)((PCollection)input.apply((PTransform)ParDo.of((DoFn)new ExtractStationSpeedFn()))).apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardMinutes((long)options.getWindowDuration().intValue())).every(Duration.standardMinutes((long)options.getWindowSlideEvery().intValue()))))).apply((PTransform)new TrackSpeed())).apply((PTransform)BigQueryIO.Write.to((TableReference)tableRef).withSchema(FormatStatsFn.getSchema()));
        if (!Strings.isNullOrEmpty((String)options.getInputFile()) && !Strings.isNullOrEmpty((String)options.getPubsubTopic())) {
            dataflowUtils.runInjectorPipeline(new ReadFileAndExtractTimestamps(options.getInputFile()), options.getPubsubTopic(), PUBSUB_TIMESTAMP_LABEL_KEY);
        }
        PipelineResult result = pipeline.run();
        dataflowUtils.waitToFinish(result);
    }

    private static Double tryParseAvgSpeed(String[] inputItems) {
        try {
            return Double.parseDouble(TrafficRoutes.tryParseString(inputItems, 9));
        }
        catch (NumberFormatException e) {
            return null;
        }
        catch (NullPointerException e) {
            return null;
        }
    }

    private static String tryParseStationType(String[] inputItems) {
        return TrafficRoutes.tryParseString(inputItems, 4);
    }

    private static String tryParseStationId(String[] inputItems) {
        return TrafficRoutes.tryParseString(inputItems, 1);
    }

    private static String tryParseTimestamp(String[] inputItems) {
        return TrafficRoutes.tryParseString(inputItems, 0);
    }

    private static String tryParseString(String[] inputItems, int index) {
        return inputItems.length >= index ? inputItems[index] : null;
    }

    private static Map<String, String> buildStationInfo() {
        Hashtable<String, String> stations = new Hashtable<String, String>();
        stations.put("1108413", "SDRoute1");
        stations.put("1108699", "SDRoute2");
        stations.put("1108702", "SDRoute2");
        return stations;
    }

    private static interface TrafficRoutesOptions
    extends DataflowExampleOptions,
    ExamplePubsubTopicAndSubscriptionOptions,
    ExampleBigQueryTableOptions {
        @Description(value="Input file to inject to Pub/Sub topic")
        @Default.String(value="gs://dataflow-samples/traffic_sensor/Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
        public String getInputFile();

        public void setInputFile(String var1);

        @Description(value="Numeric value of sliding window duration, in minutes")
        @Default.Integer(value=3)
        public Integer getWindowDuration();

        public void setWindowDuration(Integer var1);

        @Description(value="Numeric value of window 'slide every' setting, in minutes")
        @Default.Integer(value=1)
        public Integer getWindowSlideEvery();

        public void setWindowSlideEvery(Integer var1);

        @Description(value="Whether to run the pipeline with unbounded input")
        @Default.Boolean(value=false)
        public boolean isUnbounded();

        public void setUnbounded(boolean var1);
    }

    static class ReadFileAndExtractTimestamps
    extends PTransform<PBegin, PCollection<String>> {
        private final String inputFile;

        public ReadFileAndExtractTimestamps(String inputFile) {
            this.inputFile = inputFile;
        }

        public PCollection<String> apply(PBegin begin) {
            return (PCollection)((PCollection)begin.apply((PTransform)TextIO.Read.from((String)this.inputFile))).apply((PTransform)ParDo.of((DoFn)new ExtractTimestamps()));
        }
    }

    static class TrackSpeed
    extends PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
        TrackSpeed() {
        }

        public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
            PCollection timeGroup = (PCollection)stationSpeed.apply((PTransform)GroupByKey.create());
            PCollection stats = (PCollection)timeGroup.apply((PTransform)ParDo.of((DoFn)new GatherStats()));
            PCollection results = (PCollection)stats.apply((PTransform)ParDo.of((DoFn)new FormatStatsFn()));
            return results;
        }
    }

    static class FormatStatsFn
    extends DoFn<KV<String, RouteInfo>, TableRow> {
        FormatStatsFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            RouteInfo routeInfo = (RouteInfo)((KV)c.element()).getValue();
            TableRow row = new TableRow().set("avg_speed", (Object)routeInfo.getAvgSpeed()).set("slowdown_event", (Object)routeInfo.getSlowdownEvent()).set("route", ((KV)c.element()).getKey()).set("window_timestamp", (Object)c.timestamp().toString());
            c.output((Object)row);
        }

        static TableSchema getSchema() {
            ArrayList<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
            fields.add(new TableFieldSchema().setName("route").setType("STRING"));
            fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
            fields.add(new TableFieldSchema().setName("slowdown_event").setType("BOOLEAN"));
            fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
            TableSchema schema = new TableSchema().setFields(fields);
            return schema;
        }
    }

    static class GatherStats
    extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
        GatherStats() {
        }

        public void processElement(DoFn.ProcessContext c) throws IOException {
            String route = (String)((KV)c.element()).getKey();
            double speedSum = 0.0;
            int speedCount = 0;
            int speedups = 0;
            int slowdowns = 0;
            ArrayList infoList = Lists.newArrayList((Iterable)((Iterable)((KV)c.element()).getValue()));
            Collections.sort(infoList);
            HashMap<String, Double> prevSpeeds = new HashMap<String, Double>();
            for (StationSpeed item : infoList) {
                Double speed = item.getAvgSpeed();
                if (speed == null) continue;
                speedSum += speed.doubleValue();
                ++speedCount;
                Double lastSpeed = (Double)prevSpeeds.get(item.getStationId());
                if (lastSpeed != null) {
                    if (lastSpeed < speed) {
                        ++speedups;
                    } else {
                        ++slowdowns;
                    }
                }
                prevSpeeds.put(item.getStationId(), speed);
            }
            if (speedCount == 0) {
                return;
            }
            double speedAvg = speedSum / (double)speedCount;
            boolean slowdownEvent = slowdowns >= 2 * speedups;
            RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
            c.output((Object)KV.of((Object)route, (Object)routeInfo));
        }
    }

    static class ExtractStationSpeedFn
    extends DoFn<String, KV<String, StationSpeed>> {
        ExtractStationSpeedFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            String[] items = ((String)c.element()).split(",");
            String stationType = TrafficRoutes.tryParseStationType(items);
            if (stationType != null && stationType.equals("ML")) {
                Double avgSpeed = TrafficRoutes.tryParseAvgSpeed(items);
                String stationId = TrafficRoutes.tryParseStationId(items);
                if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
                    StationSpeed stationSpeed = new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
                    KV outputValue = KV.of((Object)sdStations.get(stationId), (Object)stationSpeed);
                    c.output((Object)outputValue);
                }
            }
        }
    }

    static class ExtractTimestamps
    extends DoFn<String, String> {
        private static final DateTimeFormatter dateTimeFormat = DateTimeFormat.forPattern((String)"MM/dd/yyyy HH:mm:ss");

        ExtractTimestamps() {
        }

        public void processElement(DoFn.ProcessContext c) throws Exception {
            String[] items = ((String)c.element()).split(",");
            String timestamp = TrafficRoutes.tryParseTimestamp(items);
            if (timestamp != null) {
                try {
                    c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
                }
                catch (IllegalArgumentException illegalArgumentException) {
                    // empty catch block
                }
            }
        }
    }

    @DefaultCoder(value=AvroCoder.class)
    static class RouteInfo {
        @Nullable
        String route;
        @Nullable
        Double avgSpeed;
        @Nullable
        Boolean slowdownEvent;

        public RouteInfo() {
        }

        public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) {
            this.route = route;
            this.avgSpeed = avgSpeed;
            this.slowdownEvent = slowdownEvent;
        }

        public String getRoute() {
            return this.route;
        }

        public Double getAvgSpeed() {
            return this.avgSpeed;
        }

        public Boolean getSlowdownEvent() {
            return this.slowdownEvent;
        }
    }

    @DefaultCoder(value=AvroCoder.class)
    static class StationSpeed
    implements Comparable<StationSpeed> {
        @Nullable
        String stationId;
        @Nullable
        Double avgSpeed;
        @Nullable
        Long timestamp;

        public StationSpeed() {
        }

        public StationSpeed(String stationId, Double avgSpeed, Long timestamp) {
            this.stationId = stationId;
            this.avgSpeed = avgSpeed;
            this.timestamp = timestamp;
        }

        public String getStationId() {
            return this.stationId;
        }

        public Double getAvgSpeed() {
            return this.avgSpeed;
        }

        @Override
        public int compareTo(StationSpeed other) {
            return Long.compare(this.timestamp, other.timestamp);
        }
    }
}

