/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.examples.streaming;

import java.util.Properties;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.examples.streaming.WindowedWordCount;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.joda.time.Duration;

public class KafkaWindowedWordCountExample {
    static final String KAFKA_TOPIC = "test";
    static final String KAFKA_BROKER = "localhost:9092";
    static final String GROUP_ID = "myGroup";
    static final String ZOOKEEPER = "localhost:2181";

    public static void main(String[] args) {
        PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
        KafkaStreamingWordCountOptions options = (KafkaStreamingWordCountOptions)PipelineOptionsFactory.fromArgs((String[])args).as(KafkaStreamingWordCountOptions.class);
        options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
        options.setStreaming(true);
        options.setCheckpointingInterval(1000L);
        options.setNumberOfExecutionRetries(5);
        options.setExecutionRetryDelay(3000L);
        options.setRunner(FlinkRunner.class);
        System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " " + options.getGroup());
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        Properties p = new Properties();
        p.setProperty("zookeeper.connect", options.getZookeeper());
        p.setProperty("bootstrap.servers", options.getBroker());
        p.setProperty("group.id", options.getGroup());
        FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08(options.getKafkaTopic(), (DeserializationSchema)new SimpleStringSchema(), p);
        PCollection words = (PCollection)((PCollection)((PCollection)pipeline.apply("StreamingWordCount", (PTransform)Read.from((UnboundedSource)UnboundedFlinkSource.of((SourceFunction)kafkaConsumer)))).apply((PTransform)ParDo.of((DoFn)new ExtractWordsFn()))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)options.getWindowSize()))).triggering((Trigger)AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes());
        PCollection wordCounts = (PCollection)words.apply((PTransform)Count.perElement());
        ((PCollection)wordCounts.apply((PTransform)ParDo.of((DoFn)new FormatAsStringFn()))).apply((PTransform)TextIO.Write.to((String)"./outputKafka.txt"));
        pipeline.run();
    }

    public static interface KafkaStreamingWordCountOptions
    extends WindowedWordCount.StreamingWordCountOptions {
        @Description(value="The Kafka topic to read from")
        @Default.String(value="test")
        public String getKafkaTopic();

        public void setKafkaTopic(String var1);

        @Description(value="The Kafka Broker to read from")
        @Default.String(value="localhost:9092")
        public String getBroker();

        public void setBroker(String var1);

        @Description(value="The Zookeeper server to connect to")
        @Default.String(value="localhost:2181")
        public String getZookeeper();

        public void setZookeeper(String var1);

        @Description(value="The groupId")
        @Default.String(value="myGroup")
        public String getGroup();

        public void setGroup(String var1);
    }

    public static class FormatAsStringFn
    extends DoFn<KV<String, Long>, String> {
        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            String row = (String)((KV)c.element()).getKey() + " - " + ((KV)c.element()).getValue() + " @ " + c.timestamp().toString();
            System.out.println(row);
            c.output((Object)row);
        }
    }

    public static class ExtractWordsFn
    extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines = this.createAggregator("emptyLines", (Combine.CombineFn)new Sum.SumLongFn());

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            String[] words;
            if (((String)c.element()).trim().isEmpty()) {
                this.emptyLines.addValue((Object)1L);
            }
            for (String word : words = ((String)c.element()).split("[^a-zA-Z']+")) {
                if (word.isEmpty()) continue;
                c.output((Object)word);
            }
        }
    }
}

