/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.examples.streaming;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Properties;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaIOExamples {
    private static final String KAFKA_TOPIC = "input";
    private static final String KAFKA_AVRO_TOPIC = "output";
    private static final String KAFKA_BROKER = "localhost:9092";
    private static final String GROUP_ID = "myGroup";
    private static final String ZOOKEEPER = "localhost:2181";

    private static Pipeline initializePipeline(String[] args) {
        KafkaOptions options = (KafkaOptions)PipelineOptionsFactory.fromArgs((String[])args).as(KafkaOptions.class);
        options.setStreaming(true);
        options.setRunner(FlinkPipelineRunner.class);
        options.setCheckpointingInterval(1000L);
        options.setNumberOfExecutionRetries(5);
        options.setExecutionRetryDelay(3000L);
        return Pipeline.create((PipelineOptions)options);
    }

    private static KafkaOptions getOptions(Pipeline p) {
        return (KafkaOptions)p.getOptions().as(KafkaOptions.class);
    }

    private static Properties getKafkaProps(KafkaOptions options) {
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", options.getZookeeper());
        props.setProperty("bootstrap.servers", options.getBroker());
        props.setProperty("group.id", options.getGroup());
        return props;
    }

    private static class PrintFn<T>
    extends DoFn<T, T> {
        private PrintFn() {
        }

        public void processElement(DoFn.ProcessContext c) throws Exception {
            System.out.println(c.element().toString());
        }
    }

    public static interface KafkaOptions
    extends FlinkPipelineOptions {
        @Description(value="The Kafka topic to read from")
        @Default.String(value="input")
        public String getKafkaTopic();

        public void setKafkaTopic(String var1);

        public void setKafkaAvroTopic(String var1);

        @Description(value="The Kafka topic to read from")
        @Default.String(value="output")
        public String getKafkaAvroTopic();

        @Description(value="The Kafka Broker to read from")
        @Default.String(value="localhost:9092")
        public String getBroker();

        public void setBroker(String var1);

        @Description(value="The Zookeeper server to connect to")
        @Default.String(value="localhost:2181")
        public String getZookeeper();

        public void setZookeeper(String var1);

        @Description(value="The groupId")
        @Default.String(value="myGroup")
        public String getGroup();

        public void setGroup(String var1);
    }

    public static class KafkaAvro {

        static class MyType
        implements Serializable {
            String word;
            long count;

            public MyType() {
            }

            MyType(String word, long count) {
                this.word = word;
                this.count = count;
            }

            public String toString() {
                String string = String.valueOf("MyType{word='");
                String string2 = this.word;
                long l = this.count;
                return new StringBuilder(30 + String.valueOf(string).length() + String.valueOf(string2).length()).append(string).append(string2).append("'").append(", count=").append(l).append("}").toString();
            }
        }

        static class AvroSerializationDeserializationSchema<T>
        implements SerializationSchema<T>,
        DeserializationSchema<T> {
            private final Class<T> avroType;
            private final AvroCoder<T> coder;
            private transient ByteArrayOutputStream out;

            AvroSerializationDeserializationSchema(Class<T> clazz) {
                this.avroType = clazz;
                this.coder = AvroCoder.of(clazz);
                this.out = new ByteArrayOutputStream();
            }

            public byte[] serialize(T element) {
                if (this.out == null) {
                    this.out = new ByteArrayOutputStream();
                }
                try {
                    this.out.reset();
                    this.coder.encode(element, (OutputStream)this.out, Coder.Context.NESTED);
                }
                catch (IOException e) {
                    throw new RuntimeException("Avro encoding failed.", e);
                }
                return this.out.toByteArray();
            }

            public T deserialize(byte[] message) throws IOException {
                return (T)this.coder.decode((InputStream)new ByteArrayInputStream(message), Coder.Context.NESTED);
            }

            public boolean isEndOfStream(T nextElement) {
                return false;
            }

            public TypeInformation<T> getProducedType() {
                return TypeExtractor.getForClass(this.avroType);
            }
        }

        public static class WriteAvroToKafka {
            public static void main(String[] args) {
                Pipeline p = KafkaIOExamples.initializePipeline(args);
                KafkaOptions options = KafkaIOExamples.getOptions(p);
                PCollection words = (PCollection)p.apply((PTransform)Create.of((Object[])new MyType[]{new MyType("word", 1L), new MyType("another", 2L), new MyType("yet another", 3L)}));
                FlinkKafkaProducer08 kafkaSink = new FlinkKafkaProducer08(options.getKafkaAvroTopic(), new AvroSerializationDeserializationSchema<MyType>(MyType.class), KafkaIOExamples.getKafkaProps(options));
                words.apply((PTransform)Write.to((Sink)UnboundedFlinkSink.of((SinkFunction)kafkaSink)));
                p.run();
            }
        }

        public static class ReadAvroFromKafka {
            public static void main(String[] args) {
                Pipeline p = KafkaIOExamples.initializePipeline(args);
                KafkaOptions options = KafkaIOExamples.getOptions(p);
                FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08(options.getKafkaAvroTopic(), new AvroSerializationDeserializationSchema<MyType>(MyType.class), KafkaIOExamples.getKafkaProps(options));
                ((PCollection)p.apply((PTransform)Read.from((UnboundedSource)UnboundedFlinkSource.of((SourceFunction)kafkaConsumer)))).apply((PTransform)ParDo.of(new PrintFn()));
                p.run();
            }
        }
    }

    public static class KafkaString {

        public static class WriteStringToKafka {
            public static void main(String[] args) {
                Pipeline p = KafkaIOExamples.initializePipeline(args);
                KafkaOptions options = KafkaIOExamples.getOptions(p);
                PCollection words = (PCollection)p.apply((PTransform)Create.of((Object[])new String[]{"These", "are", "some", "words"}));
                FlinkKafkaProducer08 kafkaSink = new FlinkKafkaProducer08(options.getKafkaTopic(), (SerializationSchema)new SimpleStringSchema(), KafkaIOExamples.getKafkaProps(options));
                words.apply((PTransform)Write.to((Sink)UnboundedFlinkSink.of((SinkFunction)kafkaSink)));
                p.run();
            }
        }

        public static class ReadStringFromKafka {
            public static void main(String[] args) {
                Pipeline p = KafkaIOExamples.initializePipeline(args);
                KafkaOptions options = KafkaIOExamples.getOptions(p);
                FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08(options.getKafkaTopic(), (DeserializationSchema)new SimpleStringSchema(), KafkaIOExamples.getKafkaProps(options));
                ((PCollection)p.apply((PTransform)Read.from((UnboundedSource)UnboundedFlinkSource.of((SourceFunction)kafkaConsumer)))).apply((PTransform)ParDo.of(new PrintFn()));
                p.run();
            }
        }
    }
}

