/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.game;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.HourlyTeamScore;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class LeaderBoard
extends HourlyTeamScore {
    private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
    private static DateTimeFormatter fmt = DateTimeFormat.forPattern((String)"yyyy-MM-dd HH:mm:ss.SSS").withZone(DateTimeZone.forTimeZone((TimeZone)TimeZone.getTimeZone("PST")));
    static final Duration FIVE_MINUTES = Duration.standardMinutes((long)5L);
    static final Duration TEN_MINUTES = Duration.standardMinutes((long)10L);

    protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> configureWindowedTableWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
        tableConfigure.put("team", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> (String)((KV)c.element()).getKey()));
        tableConfigure.put("total_score", new WriteToBigQuery.FieldInfo("INTEGER", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> (Integer)((KV)c.element()).getValue()));
        tableConfigure.put("window_start", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> {
            IntervalWindow w = (IntervalWindow)c.window();
            return fmt.print((ReadableInstant)w.start());
        }));
        tableConfigure.put("processing_time", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> fmt.print((ReadableInstant)Instant.now())));
        tableConfigure.put("timing", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> c.pane().getTiming().toString()));
        return tableConfigure;
    }

    protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> configureGlobalWindowBigQueryWrite() {
        Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = LeaderBoard.configureBigQueryWrite();
        tableConfigure.put("processing_time", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> fmt.print((ReadableInstant)Instant.now())));
        return tableConfigure;
    }

    public static void main(String[] args) throws Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        options.setStreaming(true);
        ExampleUtils exampleUtils = new ExampleUtils((PipelineOptions)options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection gameEvents = (PCollection)((PCollection)pipeline.apply((PTransform)PubsubIO.Read.timestampLabel((String)TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))).apply("ParseGameEvent", (PTransform)ParDo.of((DoFn)new UserScore.ParseEventFn()));
        ((PCollection)((PCollection)gameEvents.apply("LeaderboardTeamFixedWindows", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)options.getTeamWindowDuration().intValue()))).triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)).withLateFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES))).withAllowedLateness(Duration.standardMinutes((long)options.getAllowedLateness().intValue())).accumulatingFiredPanes())).apply("ExtractTeamScore", (PTransform)new UserScore.ExtractAndSumScore("team"))).apply("WriteTeamScoreSums", new WriteWindowedToBigQuery(options.getTableName() + "_team", LeaderBoard.configureWindowedTableWrite()));
        ((PCollection)((PCollection)gameEvents.apply("LeaderboardUserGlobalWindow", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES))).accumulatingFiredPanes().withAllowedLateness(Duration.standardMinutes((long)options.getAllowedLateness().intValue())))).apply("ExtractUserScore", (PTransform)new UserScore.ExtractAndSumScore("user"))).apply("WriteUserScoreSums", new WriteToBigQuery(options.getTableName() + "_user", LeaderBoard.configureGlobalWindowBigQueryWrite()));
        PipelineResult result = pipeline.run();
        exampleUtils.waitToFinish(result);
    }

    static interface Options
    extends HourlyTeamScore.Options,
    ExampleOptions,
    StreamingOptions {
        @Description(value="Pub/Sub topic to read from")
        @Validation.Required
        public String getTopic();

        public void setTopic(String var1);

        @Description(value="Numeric value of fixed window duration for team analysis, in minutes")
        @Default.Integer(value=60)
        public Integer getTeamWindowDuration();

        public void setTeamWindowDuration(Integer var1);

        @Description(value="Numeric value of allowed data lateness, in minutes")
        @Default.Integer(value=120)
        public Integer getAllowedLateness();

        public void setAllowedLateness(Integer var1);

        @Override
        @Description(value="Prefix used for the BigQuery table names")
        @Default.String(value="leaderboard")
        public String getTableName();

        @Override
        public void setTableName(String var1);
    }
}

