/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.gearpump.translators.io;

import io.gearpump.DefaultMessage;
import io.gearpump.Message;
import io.gearpump.streaming.source.Watermark;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.io.GearpumpSource;
import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;

public class GearpumpSourceTest {
    private static final List<TimestampedValue<String>> TEST_VALUES = Lists.newArrayList((Object[])new TimestampedValue[]{TimestampedValue.of((Object)"a", (org.joda.time.Instant)BoundedWindow.TIMESTAMP_MIN_VALUE), TimestampedValue.of((Object)"b", (org.joda.time.Instant)new org.joda.time.Instant(0L)), TimestampedValue.of((Object)"c", (org.joda.time.Instant)new org.joda.time.Instant(53L)), TimestampedValue.of((Object)"d", (org.joda.time.Instant)BoundedWindow.TIMESTAMP_MAX_VALUE)});

    @Test
    public void testGearpumpSource() {
        GearpumpPipelineOptions options = (GearpumpPipelineOptions)PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
        ValuesSource valuesSource = new ValuesSource(TEST_VALUES, (Coder)TimestampedValue.TimestampedValueCoder.of((Coder)StringUtf8Coder.of()));
        SourceForTest sourceForTest = new SourceForTest((PipelineOptions)options, valuesSource);
        sourceForTest.open(null, Instant.EPOCH);
        for (int i = 0; i < TEST_VALUES.size(); ++i) {
            TimestampedValue<String> value = TEST_VALUES.get(i);
            if (i < TEST_VALUES.size() - 1) {
                Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time((org.joda.time.Instant)value.getTimestamp());
                Assert.assertEquals((Object)expectedWaterMark, (Object)sourceForTest.getWatermark());
            } else {
                Assert.assertEquals((Object)Watermark.MAX(), (Object)sourceForTest.getWatermark());
            }
            DefaultMessage expectedMsg = new DefaultMessage((Object)WindowedValue.timestampedValueInGlobalWindow(value, (org.joda.time.Instant)value.getTimestamp()), value.getTimestamp().getMillis());
            Message message = sourceForTest.read();
            Assert.assertEquals((Object)expectedMsg, (Object)message);
        }
        Assert.assertNull((Object)sourceForTest.read());
        Assert.assertEquals((Object)Watermark.MAX(), (Object)sourceForTest.getWatermark());
    }

    private static class SourceForTest<T>
    extends GearpumpSource<T> {
        private ValuesSource<T> valuesSource;

        SourceForTest(PipelineOptions options, ValuesSource<T> valuesSource) {
            super(options);
            this.valuesSource = valuesSource;
        }

        protected Source.Reader<T> createReader(PipelineOptions options) throws IOException {
            return this.valuesSource.createReader(options, null);
        }
    }
}

