/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.examples.streaming;

import java.io.IOException;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.examples.WordCount;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TriggerBuilder;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowedWordCount {
    private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
    static final long WINDOW_SIZE = 10L;
    static final long SLIDE_SIZE = 5L;

    public static void main(String[] args) throws IOException {
        StreamingWordCountOptions options = (StreamingWordCountOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(StreamingWordCountOptions.class);
        options.setStreaming(true);
        options.setWindowSize(10L);
        options.setSlide(5L);
        options.setCheckpointingInterval(1000L);
        options.setNumberOfExecutionRetries(5);
        options.setExecutionRetryDelay(3000L);
        options.setRunner(FlinkPipelineRunner.class);
        String string = String.valueOf(options.getWindowSize());
        String string2 = String.valueOf(options.getSlide());
        LOG.info(new StringBuilder(64 + String.valueOf(string).length() + String.valueOf(string2).length()).append("Windpwed WordCount with Sliding Windows of ").append(string).append(" sec. and a slide of ").append(string2).toString());
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection words = (PCollection)((PCollection)((PCollection)pipeline.apply((PTransform)Read.from((UnboundedSource)new UnboundedSocketSource("localhost", 9999, '\n', 3L)).named("StreamingWordCount"))).apply((PTransform)ParDo.of((DoFn)new ExtractWordsFn()))).apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)options.getWindowSize())).every(Duration.standardSeconds((long)options.getSlide()))).triggering((TriggerBuilder)AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes());
        PCollection wordCounts = (PCollection)words.apply((PTransform)Count.perElement());
        ((PCollection)wordCounts.apply((PTransform)ParDo.of((DoFn)new FormatAsStringFn()))).apply((PTransform)TextIO.Write.to((String)"./outputWordCount.txt"));
        pipeline.run();
    }

    public static interface StreamingWordCountOptions
    extends WordCount.Options {
        @Description(value="Sliding window duration, in seconds")
        @Default.Long(value=10L)
        public Long getWindowSize();

        public void setWindowSize(Long var1);

        @Description(value="Window slide, in seconds")
        @Default.Long(value=5L)
        public Long getSlide();

        public void setSlide(Long var1);
    }

    static class ExtractWordsFn
    extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines = this.createAggregator("emptyLines", (Combine.CombineFn)new Sum.SumLongFn());

        ExtractWordsFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            String[] words;
            if (((String)c.element()).trim().isEmpty()) {
                this.emptyLines.addValue((Object)1L);
            }
            for (String word : words = ((String)c.element()).split("[^a-zA-Z']+")) {
                if (word.isEmpty()) continue;
                c.output((Object)word);
            }
        }
    }

    static class FormatAsStringFn
    extends DoFn<KV<String, Long>, String> {
        FormatAsStringFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            String string = (String)((KV)c.element()).getKey();
            String string2 = String.valueOf(((KV)c.element()).getValue());
            String string3 = String.valueOf(c.timestamp().toString());
            String row = new StringBuilder(6 + String.valueOf(string).length() + String.valueOf(string2).length() + String.valueOf(string3).length()).append(string).append(" - ").append(string2).append(" @ ").append(string3).toString();
            c.output((Object)row);
        }
    }
}

