/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.adapter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
import org.apache.beam.runners.samza.adapter.TestBoundedSource;
import org.apache.beam.runners.samza.adapter.TestSourceHelpers;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.samza.Partition;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

public class BoundedSourceSystemTest {
    private static final SystemStreamPartition DEFAULT_SSP = new SystemStreamPartition("default-system", "default-system", new Partition(0));
    private static final long DEFAULT_TIMEOUT_MILLIS = 1000L;
    private static final String NULL_STRING = null;

    @Test
    public void testConsumerStartStop() throws IOException, InterruptedException {
        TestBoundedSource<String> source = TestBoundedSource.createBuilder().build();
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, "0");
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testConsumeOneMessage() throws IOException, InterruptedException {
        TestBoundedSource source = (TestBoundedSource)((Object)TestBoundedSource.createBuilder().addElements("test").build());
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, "0");
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "test", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testAdvanceTimestamp() throws InterruptedException {
        Instant timestamp = Instant.now();
        TestBoundedSource source = (TestBoundedSource)((Object)TestBoundedSource.createBuilder().addElements("before").setTimestamp(timestamp).addElements((String[])new String[]{"after"}).build());
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, "0");
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "before", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "1", "after", timestamp), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testConsumeMultipleMessages() throws IOException, InterruptedException {
        TestBoundedSource source = (TestBoundedSource)((Object)TestBoundedSource.createBuilder().addElements("test", "a", "few", "messages").build());
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, "0");
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "test", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "1", "a", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "2", "few", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "3", "messages", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testReaderThrowsAtStart() throws Exception {
        IOException exception = new IOException("Expected exception");
        TestBoundedSource source = (TestBoundedSource)((Object)TestBoundedSource.createBuilder().addException(exception).build());
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, "0");
        consumer.start();
        TestSourceHelpers.expectWrappedException(exception, () -> BoundedSourceSystemTest.consumeUntilTimeoutOrEos((SystemConsumer)consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testReaderThrowsAtAdvance() throws Exception {
        IOException exception = new IOException("Expected exception");
        TestBoundedSource source = (TestBoundedSource)((Object)TestBoundedSource.createBuilder().addElements("test", "a", "few", "good", "messages", "then", "...").addException(exception).build());
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, "0");
        consumer.start();
        TestSourceHelpers.expectWrappedException(exception, () -> BoundedSourceSystemTest.consumeUntilTimeoutOrEos((SystemConsumer)consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testTimeout() throws Exception {
        CountDownLatch advanceLatch = new CountDownLatch(1);
        TestBoundedSource source = (TestBoundedSource)((Object)TestBoundedSource.createBuilder().addElements("before").addLatch(advanceLatch).addElements((String[])new String[]{"after"}).build());
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, "0");
        consumer.start();
        Assert.assertEquals(Collections.singletonList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "before", BoundedWindow.TIMESTAMP_MIN_VALUE)), BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, 1000L));
        advanceLatch.countDown();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "1", "after", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testSplit() throws IOException, InterruptedException {
        TestBoundedSource.SplittableBuilder builder = TestBoundedSource.createSplits(3);
        builder.forSplit(0).addElements("split-0");
        builder.forSplit(1).addElements("split-1");
        builder.forSplit(2).addElements("split-2");
        TestBoundedSource<String> source = builder.build();
        BoundedSourceSystem.Consumer<String> consumer = BoundedSourceSystemTest.createConsumer(source, 3);
        consumer.register(BoundedSourceSystemTest.ssp(0), NULL_STRING);
        consumer.register(BoundedSourceSystemTest.ssp(1), NULL_STRING);
        consumer.register(BoundedSourceSystemTest.ssp(2), NULL_STRING);
        consumer.start();
        HashSet<String> offsets = new HashSet<String>();
        List<IncomingMessageEnvelope> envelopes = BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, BoundedSourceSystemTest.ssp(0), 1000L);
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(BoundedSourceSystemTest.ssp(0), envelopes.get(0).getOffset(), "split-0", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(BoundedSourceSystemTest.ssp(0), BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(BoundedSourceSystemTest.ssp(0))), envelopes);
        offsets.add(envelopes.get(0).getOffset());
        envelopes = BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, BoundedSourceSystemTest.ssp(1), 1000L);
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(BoundedSourceSystemTest.ssp(1), envelopes.get(0).getOffset(), "split-1", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(BoundedSourceSystemTest.ssp(1), BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(BoundedSourceSystemTest.ssp(1))), envelopes);
        offsets.add(envelopes.get(0).getOffset());
        envelopes = BoundedSourceSystemTest.consumeUntilTimeoutOrEos(consumer, BoundedSourceSystemTest.ssp(2), 1000L);
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(BoundedSourceSystemTest.ssp(2), envelopes.get(0).getOffset(), "split-2", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(BoundedSourceSystemTest.ssp(2), BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(BoundedSourceSystemTest.ssp(2))), envelopes);
        offsets.add(envelopes.get(0).getOffset());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"0", "1", "2"}), offsets);
        consumer.stop();
    }

    private static List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos(SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
        long start;
        Assert.assertTrue((String)("Expected timeoutMillis (" + timeoutMillis + ") >= 0"), (timeoutMillis >= 0L ? 1 : 0) != 0);
        ArrayList<IncomingMessageEnvelope> accumulator = new ArrayList<IncomingMessageEnvelope>();
        long now = start = System.currentTimeMillis();
        while (timeoutMillis + start >= now) {
            accumulator.addAll(BoundedSourceSystemTest.pollOnce(consumer, ssp, now - start - timeoutMillis));
            if (!accumulator.isEmpty() && ((IncomingMessageEnvelope)accumulator.get(accumulator.size() - 1)).isEndOfStream()) break;
            now = System.currentTimeMillis();
        }
        return accumulator;
    }

    private static List<IncomingMessageEnvelope> pollOnce(SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
        Set<SystemStreamPartition> sspSet = Collections.singleton(ssp);
        Map pollResult = consumer.poll(sspSet, timeoutMillis);
        Assert.assertEquals(sspSet, pollResult.keySet());
        Assert.assertNotNull(pollResult.get(ssp));
        return (List)pollResult.get(ssp);
    }

    private static <T> BoundedSourceSystem.Consumer<String> createConsumer(BoundedSource<String> source) {
        return BoundedSourceSystemTest.createConsumer(source, 1);
    }

    private static BoundedSourceSystem.Consumer<String> createConsumer(BoundedSource<String> source, int splitNum) {
        SamzaPipelineOptions pipelineOptions = (SamzaPipelineOptions)PipelineOptionsFactory.as(SamzaPipelineOptions.class);
        pipelineOptions.setMaxSourceParallelism(splitNum);
        return new BoundedSourceSystem.Consumer(source, pipelineOptions, new SamzaMetricsContainer(new MetricsRegistryMap()), "test-step");
    }

    private static SystemStreamPartition ssp(int partition) {
        return new SystemStreamPartition("default-system", "default-system", new Partition(partition));
    }
}

