/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.game;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.complete.game.LeaderBoard;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GameStats
extends LeaderBoard {
    private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
    private static DateTimeFormatter fmt = DateTimeFormat.forPattern((String)"yyyy-MM-dd HH:mm:ss.SSS").withZone(DateTimeZone.forTimeZone((TimeZone)TimeZone.getTimeZone("PST")));

    protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> configureWindowedWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
        tableConfigure.put("team", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> (String)((KV)c.element()).getKey()));
        tableConfigure.put("total_score", new WriteToBigQuery.FieldInfo("INTEGER", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> (Integer)((KV)c.element()).getValue()));
        tableConfigure.put("window_start", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> {
            IntervalWindow w = (IntervalWindow)c.window();
            return fmt.print((ReadableInstant)w.start());
        }));
        tableConfigure.put("processing_time", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> fmt.print((ReadableInstant)Instant.now())));
        return tableConfigure;
    }

    protected static Map<String, WriteToBigQuery.FieldInfo<Double>> configureSessionWindowWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<Double>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<Double>>();
        tableConfigure.put("window_start", new WriteToBigQuery.FieldInfo("STRING", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> {
            IntervalWindow w = (IntervalWindow)c.window();
            return fmt.print((ReadableInstant)w.start());
        }));
        tableConfigure.put("mean_duration", new WriteToBigQuery.FieldInfo("FLOAT", (SerializableFunction<DoFn.ProcessContext, Object>)(SerializableFunction & Serializable)c -> (Double)c.element()));
        return tableConfigure;
    }

    public static void main(String[] args) throws Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        options.setStreaming(true);
        DataflowExampleUtils dataflowUtils = new DataflowExampleUtils((DataflowPipelineOptions)options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection rawEvents = (PCollection)((PCollection)pipeline.apply((PTransform)PubsubIO.Read.timestampLabel((String)TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))).apply((PTransform)ParDo.named((String)"ParseGameEvent").of((DoFn)new UserScore.ParseEventFn()));
        PCollection userEvents = (PCollection)rawEvents.apply("ExtractUserScore", (PTransform)MapElements.via((SerializableFunction & Serializable)gInfo -> KV.of((Object)gInfo.getUser(), (Object)gInfo.getScore())).withOutputType((TypeDescriptor)new TypeDescriptor<KV<String, Integer>>(){}));
        final PCollectionView spammersView = (PCollectionView)((PCollection)((PCollection)userEvents.apply((PTransform)Window.named((String)"FixedWindowsUser").into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)options.getFixedWindowDuration().intValue()))))).apply("CalculateSpammyUsers", (PTransform)new CalculateSpammyUsers())).apply("CreateSpammersView", (PTransform)View.asMap());
        ((PCollection)((PCollection)((PCollection)rawEvents.apply((PTransform)Window.named((String)"WindowIntoFixedWindows").into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)options.getFixedWindowDuration().intValue()))))).apply((PTransform)ParDo.named((String)"FilterOutSpammers").withSideInputs(new PCollectionView[]{spammersView}).of((DoFn)new DoFn<UserScore.GameActionInfo, UserScore.GameActionInfo>(){

            public void processElement(DoFn.ProcessContext c) {
                if (((Map)c.sideInput(spammersView)).get(((UserScore.GameActionInfo)c.element()).getUser().trim()) == null) {
                    c.output(c.element());
                }
            }
        }))).apply("ExtractTeamScore", (PTransform)new UserScore.ExtractAndSumScore("team"))).apply("WriteTeamSums", new WriteWindowedToBigQuery(String.valueOf(options.getTablePrefix()).concat("_team"), GameStats.configureWindowedWrite()));
        ((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)userEvents.apply((PTransform)Window.named((String)"WindowIntoSessions").into((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)options.getSessionGap().intValue()))).withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))).apply((PTransform)Combine.perKey((SerializableFunction & Serializable)x -> 0))).apply("UserSessionActivity", (PTransform)ParDo.of((DoFn)new UserSessionInfoFn()))).apply((PTransform)Window.named((String)"WindowToExtractSessionMean").into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)options.getUserActivityWindowDuration().intValue()))))).apply((PTransform)Mean.globally().withoutDefaults())).apply("WriteAvgSessionLength", new WriteWindowedToBigQuery(String.valueOf(options.getTablePrefix()).concat("_sessions"), GameStats.configureSessionWindowWrite()));
        PipelineResult result = pipeline.run();
        dataflowUtils.waitToFinish(result);
    }

    static interface Options
    extends LeaderBoard.Options {
        @Description(value="Numeric value of fixed window duration for user analysis, in minutes")
        @Default.Integer(value=60)
        public Integer getFixedWindowDuration();

        public void setFixedWindowDuration(Integer var1);

        @Description(value="Numeric value of gap between user sessions, in minutes")
        @Default.Integer(value=5)
        public Integer getSessionGap();

        public void setSessionGap(Integer var1);

        @Description(value="Numeric value of fixed window for finding mean of user session duration, in minutes")
        @Default.Integer(value=30)
        public Integer getUserActivityWindowDuration();

        public void setUserActivityWindowDuration(Integer var1);

        @Description(value="Prefix used for the BigQuery table names")
        @Default.String(value="game_stats")
        public String getTablePrefix();

        public void setTablePrefix(String var1);
    }

    private static class UserSessionInfoFn
    extends DoFn<KV<String, Integer>, Integer>
    implements DoFn.RequiresWindowAccess {
        private UserSessionInfoFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            IntervalWindow w = (IntervalWindow)c.window();
            int duration = new Duration((ReadableInstant)w.start(), (ReadableInstant)w.end()).toPeriod().toStandardMinutes().getMinutes();
            c.output((Object)duration);
        }
    }

    public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
        private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
        private static final double SCORE_WEIGHT = 2.5;

        public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {
            PCollection sumScores = (PCollection)userScores.apply("UserSum", (PTransform)Sum.integersPerKey());
            final PCollectionView globalMeanScore = (PCollectionView)((PCollection)sumScores.apply((PTransform)Values.create())).apply((PTransform)Mean.globally().asSingletonView());
            PCollection filtered = (PCollection)sumScores.apply((PTransform)ParDo.named((String)"ProcessAndFilter").withSideInputs(new PCollectionView[]{globalMeanScore}).of((DoFn)new DoFn<KV<String, Integer>, KV<String, Integer>>(){
                private final Aggregator<Long, Long> numSpammerUsers = this.createAggregator("SpammerUsers", (Combine.CombineFn)new Sum.SumLongFn());

                public void processElement(DoFn.ProcessContext c) {
                    Integer score = (Integer)((KV)c.element()).getValue();
                    Double gmc = (Double)c.sideInput(globalMeanScore);
                    if ((double)score.intValue() > gmc * 2.5) {
                        String string = (String)((KV)c.element()).getKey();
                        String string2 = String.valueOf(score);
                        String string3 = String.valueOf(gmc);
                        LOG.info(new StringBuilder(31 + String.valueOf(string).length() + String.valueOf(string2).length() + String.valueOf(string3).length()).append("user ").append(string).append(" spammer score ").append(string2).append(" with mean ").append(string3).toString());
                        this.numSpammerUsers.addValue((Object)1L);
                        c.output(c.element());
                    }
                }
            }));
            return filtered;
        }
    }
}

