/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.examples.streaming;

import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.examples.streaming.WindowedWordCount;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.TriggerBuilder;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;

public class JoinExamples {
    static PCollection<String> joinEvents(PCollection<String> streamA, PCollection<String> streamB) throws Exception {
        final TupleTag firstInfoTag = new TupleTag();
        final TupleTag secondInfoTag = new TupleTag();
        PCollection firstInfo = (PCollection)streamA.apply((PTransform)ParDo.of((DoFn)new ExtractEventDataFn()));
        PCollection secondInfo = (PCollection)streamB.apply((PTransform)ParDo.of((DoFn)new ExtractEventDataFn()));
        PCollection kvpCollection = (PCollection)KeyedPCollectionTuple.of((TupleTag)firstInfoTag, (PCollection)firstInfo).and(secondInfoTag, secondInfo).apply((PTransform)CoGroupByKey.create());
        PCollection finalResultCollection = (PCollection)kvpCollection.apply((PTransform)ParDo.named((String)"Process").of((DoFn)new DoFn<KV<String, CoGbkResult>, KV<String, String>>(){
            private static final long serialVersionUID = 0L;

            public void processElement(DoFn.ProcessContext c) {
                KV e = (KV)c.element();
                String key = (String)e.getKey();
                String defaultA = "NO_VALUE";
                String lineA = (String)((CoGbkResult)e.getValue()).getOnly(firstInfoTag, (Object)defaultA);
                for (String lineB : ((CoGbkResult)((KV)c.element()).getValue()).getAll(secondInfoTag)) {
                    c.output((Object)KV.of((Object)key, (Object)new StringBuilder(21 + String.valueOf(lineA).length() + String.valueOf(lineB).length()).append("Value A: ").append(lineA).append(" - Value B: ").append(lineB).toString()));
                }
            }
        }));
        return (PCollection)finalResultCollection.apply((PTransform)ParDo.named((String)"Format").of((DoFn)new DoFn<KV<String, String>, String>(){
            private static final long serialVersionUID = 0L;

            public void processElement(DoFn.ProcessContext c) {
                String string = (String)((KV)c.element()).getKey();
                String string2 = (String)((KV)c.element()).getValue();
                String result = new StringBuilder(4 + String.valueOf(string).length() + String.valueOf(string2).length()).append(string).append(" -> ").append(string2).toString();
                System.out.println(result);
                c.output((Object)result);
            }
        }));
    }

    public static void main(String[] args) throws Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        options.setStreaming(true);
        options.setCheckpointingInterval(1000L);
        options.setNumberOfExecutionRetries(5);
        options.setExecutionRetryDelay(3000L);
        options.setRunner(FlinkPipelineRunner.class);
        Read.Unbounded readSourceA = Read.from((UnboundedSource)new UnboundedSocketSource("localhost", 9999, '\n', 3L)).named("FirstStream");
        Read.Unbounded readSourceB = Read.from((UnboundedSource)new UnboundedSocketSource("localhost", 9998, '\n', 3L)).named("SecondStream");
        FixedWindows windowFn = FixedWindows.of((Duration)Duration.standardSeconds((long)options.getWindowSize()));
        Pipeline p = Pipeline.create((PipelineOptions)options);
        PCollection streamA = (PCollection)((PCollection)p.apply((PTransform)readSourceA)).apply((PTransform)Window.into((WindowFn)windowFn).triggering((TriggerBuilder)AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes());
        PCollection streamB = (PCollection)((PCollection)p.apply((PTransform)readSourceB)).apply((PTransform)Window.into((WindowFn)windowFn).triggering((TriggerBuilder)AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes());
        PCollection<String> formattedResults = JoinExamples.joinEvents((PCollection<String>)streamA, (PCollection<String>)streamB);
        formattedResults.apply((PTransform)TextIO.Write.to((String)"./outputJoin.txt"));
        p.run();
    }

    private static interface Options
    extends WindowedWordCount.StreamingWordCountOptions {
    }

    static class ExtractEventDataFn
    extends DoFn<String, KV<String, String>> {
        private static final long serialVersionUID = 0L;

        ExtractEventDataFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            String line = ((String)c.element()).toLowerCase();
            String key = line.split("\\s")[0];
            c.output((Object)KV.of((Object)key, (Object)line));
        }
    }
}

