package org.apache.beam.runners.apex.examples;

import java.io.File;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.TestApexRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/apex/examples/WordCountTest.class */
public class WordCountTest {

    /* loaded from: input_file:org/apache/beam/runners/apex/examples/WordCountTest$CollectResultsFn.class */
    static class CollectResultsFn extends DoFn<KV<String, Long>, String> {
        static final ConcurrentHashMap<String, Long> RESULTS = new ConcurrentHashMap<>();

        CollectResultsFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, Long>, String>.ProcessContext processContext) {
            RESULTS.put((String) ((KV) processContext.element()).getKey(), (Long) ((KV) processContext.element()).getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/apex/examples/WordCountTest$ExtractWordsFn.class */
    public static class ExtractWordsFn extends DoFn<String, String> {
        private static final long serialVersionUID = 1;
        private final Counter emptyLines = Metrics.counter("main", "emptyLines");

        ExtractWordsFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext) {
            if (((String) processContext.element()).trim().isEmpty()) {
                this.emptyLines.inc(serialVersionUID);
            }
            for (String str : ((String) processContext.element()).split("[^a-zA-Z']+")) {
                if (!str.isEmpty()) {
                    processContext.output(str);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/apex/examples/WordCountTest$FormatAsStringFn.class */
    public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
        private static final long serialVersionUID = 1;

        FormatAsStringFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, Long>, String>.ProcessContext processContext) {
            processContext.output(((String) ((KV) processContext.element()).getKey()) + " - " + ((KV) processContext.element()).getValue() + " @ " + processContext.timestamp().toString());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/examples/WordCountTest$WordCountOptions.class */
    public interface WordCountOptions extends ApexPipelineOptions {
        @Description("Path of the file to read from")
        @Validation.Required
        String getInputFile();

        void setInputFile(String str);

        @Description("Path of the file to write to")
        @Validation.Required
        String getOutput();

        void setOutput(String str);
    }

    static void runWordCount(WordCountOptions wordCountOptions) {
        Pipeline create = Pipeline.create(wordCountOptions);
        create.apply("ReadLines", TextIO.read().from(wordCountOptions.getInputFile())).apply(ParDo.of(new ExtractWordsFn())).apply(Count.perElement()).apply(ParDo.of(new FormatAsStringFn())).apply("WriteCounts", TextIO.write().to(wordCountOptions.getOutput()));
        create.run().waitUntilFinish();
    }

    public static void main(String[] strArr) {
        runWordCount(PipelineOptionsFactory.fromArgs(strArr).withValidation().as(WordCountOptions.class));
    }

    @Test
    public void testWordCountExample() throws Exception {
        PipelineOptionsFactory.register(WordCountOptions.class);
        WordCountOptions as = TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
        as.setRunner(TestApexRunner.class);
        as.setApplicationName("StreamingWordCount");
        as.setInputFile(new File(WordCountTest.class.getResource("/words.txt").getFile()).getAbsolutePath());
        as.setOutput("target/wordcountresult.txt");
        File file = new File("target/wordcountresult.txt-00000-of-00002");
        File file2 = new File("target/wordcountresult.txt-00001-of-00002");
        Assert.assertTrue(!file.exists() || file.delete());
        Assert.assertTrue(!file2.exists() || file2.delete());
        runWordCount(as);
        Assert.assertTrue("result files exist", file.exists() && file2.exists());
        HashSet hashSet = new HashSet();
        hashSet.addAll(Files.readAllLines(file.toPath()));
        hashSet.addAll(Files.readAllLines(file2.toPath()));
        Assert.assertEquals("expected output", Sets.newHashSet(new String[]{"foo - 5 @ 294247-01-09T04:00:54.775Z", "bar - 5 @ 294247-01-09T04:00:54.775Z"}), hashSet);
    }

    @Test
    public void testWindowedWordCount() throws Exception {
        ApexPipelineOptions as = PipelineOptionsFactory.fromArgs(new String[]{"--runner=" + ApexRunner.class.getName()}).withValidation().as(ApexPipelineOptions.class);
        as.setApplicationName("StreamingWordCount");
        Pipeline create = Pipeline.create(as);
        create.apply(Read.from(new UnboundedTextSource())).apply(ParDo.of(new ExtractWordsFn())).apply(Window.into(FixedWindows.of(Duration.standardSeconds(10L)))).apply(Count.perElement()).apply(ParDo.of(new CollectResultsFn()));
        ApexRunnerResult run = create.run();
        Assert.assertNotNull(run.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (System.currentTimeMillis() < currentTimeMillis && (!CollectResultsFn.RESULTS.containsKey("foo") || !CollectResultsFn.RESULTS.containsKey("bar"))) {
            run.waitUntilFinish(Duration.millis(1000L));
        }
        run.cancel();
        Assert.assertTrue(CollectResultsFn.RESULTS.containsKey("foo") && CollectResultsFn.RESULTS.containsKey("bar"));
        CollectResultsFn.RESULTS.clear();
    }
}
