/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.cookbook;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

public class JoinExamples {
    private static final String GDELT_EVENTS_TABLE = "clouddataflow-readonly:samples.gdelt_sample";
    private static final String COUNTRY_CODES = "gdelt-bq:full.crosswalk_geocountrycodetohuman";

    static PCollection<String> joinEvents(PCollection<TableRow> eventsTable, PCollection<TableRow> countryCodes) throws Exception {
        final TupleTag eventInfoTag = new TupleTag();
        final TupleTag countryInfoTag = new TupleTag();
        PCollection eventInfo = (PCollection)eventsTable.apply((PTransform)ParDo.of((DoFn)new ExtractEventDataFn()));
        PCollection countryInfo = (PCollection)countryCodes.apply((PTransform)ParDo.of((DoFn)new ExtractCountryInfoFn()));
        PCollection kvpCollection = (PCollection)KeyedPCollectionTuple.of((TupleTag)eventInfoTag, (PCollection)eventInfo).and(countryInfoTag, countryInfo).apply((PTransform)CoGroupByKey.create());
        PCollection finalResultCollection = (PCollection)kvpCollection.apply((PTransform)ParDo.named((String)"Process").of((DoFn)new DoFn<KV<String, CoGbkResult>, KV<String, String>>(){

            public void processElement(DoFn.ProcessContext c) {
                KV e = (KV)c.element();
                String countryCode = (String)e.getKey();
                String countryName = "none";
                countryName = (String)((CoGbkResult)e.getValue()).getOnly(countryInfoTag);
                for (String eventInfo : ((CoGbkResult)((KV)c.element()).getValue()).getAll(eventInfoTag)) {
                    String string = countryName;
                    c.output((Object)KV.of((Object)countryCode, (Object)new StringBuilder(28 + String.valueOf(string).length() + String.valueOf(eventInfo).length()).append("Country name: ").append(string).append(", Event info: ").append(eventInfo).toString()));
                }
            }
        }));
        PCollection formattedResults = (PCollection)finalResultCollection.apply((PTransform)ParDo.named((String)"Format").of((DoFn)new DoFn<KV<String, String>, String>(){

            public void processElement(DoFn.ProcessContext c) {
                String string = (String)((KV)c.element()).getKey();
                String string2 = (String)((KV)c.element()).getValue();
                String outputstring = new StringBuilder(16 + String.valueOf(string).length() + String.valueOf(string2).length()).append("Country code: ").append(string).append(", ").append(string2).toString();
                c.output((Object)outputstring);
            }
        }));
        return formattedResults;
    }

    public static void main(String[] args) throws Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        Pipeline p = Pipeline.create((PipelineOptions)options);
        PCollection eventsTable = (PCollection)p.apply((PTransform)BigQueryIO.Read.from((String)GDELT_EVENTS_TABLE));
        PCollection countryCodes = (PCollection)p.apply((PTransform)BigQueryIO.Read.from((String)COUNTRY_CODES));
        PCollection<String> formattedResults = JoinExamples.joinEvents((PCollection<TableRow>)eventsTable, (PCollection<TableRow>)countryCodes);
        formattedResults.apply((PTransform)TextIO.Write.to((String)options.getOutput()));
        p.run();
    }

    private static interface Options
    extends PipelineOptions {
        @Description(value="Path of the file to write to")
        @Validation.Required
        public String getOutput();

        public void setOutput(String var1);
    }

    static class ExtractCountryInfoFn
    extends DoFn<TableRow, KV<String, String>> {
        ExtractCountryInfoFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            TableRow row = (TableRow)c.element();
            String countryCode = (String)row.get((Object)"FIPSCC");
            String countryName = (String)row.get((Object)"HumanName");
            c.output((Object)KV.of((Object)countryCode, (Object)countryName));
        }
    }

    static class ExtractEventDataFn
    extends DoFn<TableRow, KV<String, String>> {
        ExtractEventDataFn() {
        }

        public void processElement(DoFn.ProcessContext c) {
            TableRow row = (TableRow)c.element();
            String countryCode = (String)row.get((Object)"ActionGeo_CountryCode");
            String sqlDate = (String)row.get((Object)"SQLDATE");
            String actor1Name = (String)row.get((Object)"Actor1Name");
            String sourceUrl = (String)row.get((Object)"SOURCEURL");
            String eventInfo = new StringBuilder(23 + String.valueOf(sqlDate).length() + String.valueOf(actor1Name).length() + String.valueOf(sourceUrl).length()).append("Date: ").append(sqlDate).append(", Actor1: ").append(actor1Name).append(", url: ").append(sourceUrl).toString();
            c.output((Object)KV.of((Object)countryCode, (Object)eventInfo));
        }
    }
}

