/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.adapter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.adapter.TestCheckpointMark;
import org.apache.beam.runners.samza.adapter.TestSourceHelpers;
import org.apache.beam.runners.samza.adapter.TestUnboundedSource;
import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.samza.Partition;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.MessageType;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

public class UnboundedSourceSystemTest {
    private static final long DEFAULT_TIMEOUT_MILLIS = 1000L;
    private static final long DEFAULT_WATERMARK_TIMEOUT_MILLIS = 1000L;
    private static final String NULL_STRING = null;
    private static final SystemStreamPartition DEFAULT_SSP = new SystemStreamPartition("default-system", "default-system", new Partition(0));
    private static final Coder<TestCheckpointMark> CHECKPOINT_MARK_CODER = TestUnboundedSource.createBuilder().build().getCheckpointMarkCoder();

    @Test
    public void testConsumerStartStop() throws IOException, InterruptedException {
        TestUnboundedSource<String> source = TestUnboundedSource.createBuilder().build();
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0));
        consumer.start();
        Assert.assertEquals((Object)Collections.EMPTY_LIST, UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testConsumeOneMessage() throws IOException, InterruptedException {
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().addElements("test").build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, NULL_STRING);
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0), "test", BoundedWindow.TIMESTAMP_MIN_VALUE)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testAdvanceTimestamp() throws IOException, InterruptedException {
        Instant timestamp = Instant.now();
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().addElements("before").setTimestamp(timestamp).addElements((String[])new String[]{"after"}).build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, NULL_STRING);
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0), "before", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(1), "after", timestamp)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testConsumeMultipleMessages() throws IOException, InterruptedException {
        Instant timestamp = Instant.now();
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().setTimestamp(timestamp).addElements("test", "a", "few", "messages").build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, NULL_STRING);
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0), "test", timestamp), TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(1), "a", timestamp), TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(2), "few", timestamp), TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(3), "messages", timestamp)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testAdvanceWatermark() throws IOException, InterruptedException {
        Instant now = Instant.now();
        Instant nowPlusOne = now.plus(1L);
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().setTimestamp(now).addElements("first").setTimestamp(nowPlusOne).addElements((String[])new String[]{"second"}).advanceWatermarkTo(now).build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, NULL_STRING);
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0), "first", now), TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(1), "second", nowPlusOne), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, now)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testMultipleAdvanceWatermark() throws IOException, InterruptedException {
        Instant now = Instant.now();
        Instant nowPlusOne = now.plus(1L);
        Instant nowPlusTwo = now.plus(2L);
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().setTimestamp(now).addElements("first").advanceWatermarkTo(now).noElements().setTimestamp(nowPlusOne).addElements((String[])new String[]{"second"}).setTimestamp(nowPlusTwo).addElements((String[])new String[]{"third"}).advanceWatermarkTo(nowPlusOne).build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, NULL_STRING);
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0), "first", now), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, now)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(1), "second", nowPlusOne), TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(2), "third", nowPlusTwo), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, nowPlusOne)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testReaderThrowsAtStart() throws Exception {
        IOException exception = new IOException("Expected exception");
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().addException(exception).build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, NULL_STRING);
        consumer.start();
        TestSourceHelpers.expectWrappedException(exception, () -> UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark((SystemConsumer)consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testReaderThrowsAtAdvance() throws Exception {
        IOException exception = new IOException("Expected exception");
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().addElements("test", "a", "few", "good", "messages", "then", "...").addException(exception).build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0));
        consumer.start();
        TestSourceHelpers.expectWrappedException(exception, () -> UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark((SystemConsumer)consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testTimeout() throws Exception {
        CountDownLatch advanceLatch = new CountDownLatch(1);
        Instant now = Instant.now();
        Instant nowPlusOne = now.plus(1L);
        TestUnboundedSource source = (TestUnboundedSource)((Object)TestUnboundedSource.createBuilder().setTimestamp(now).addElements("before").addLatch(advanceLatch).setTimestamp(nowPlusOne).addElements((String[])new String[]{"after"}).advanceWatermarkTo(nowPlusOne).build());
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source);
        consumer.register(DEFAULT_SSP, NULL_STRING);
        consumer.start();
        Assert.assertEquals(Collections.singletonList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(0), "before", now)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        advanceLatch.countDown();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, UnboundedSourceSystemTest.offset(1), "after", nowPlusOne), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, nowPlusOne)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, 1000L));
        consumer.stop();
    }

    @Test
    public void testRestartFromCheckpoint() throws IOException, InterruptedException {
        TestUnboundedSource.SplittableBuilder builder = TestUnboundedSource.createSplits(3);
        builder.forSplit(0).addElements("split-0");
        builder.forSplit(1).addElements("split-1");
        builder.forSplit(2).addElements("split-2");
        TestUnboundedSource<String> source = builder.build();
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer = UnboundedSourceSystemTest.createConsumer(source, 3);
        consumer.register(UnboundedSourceSystemTest.ssp(0), UnboundedSourceSystemTest.offset(10));
        consumer.register(UnboundedSourceSystemTest.ssp(1), UnboundedSourceSystemTest.offset(5));
        consumer.register(UnboundedSourceSystemTest.ssp(2), UnboundedSourceSystemTest.offset(8));
        consumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(UnboundedSourceSystemTest.ssp(0), UnboundedSourceSystemTest.offset(11), "split-0", BoundedWindow.TIMESTAMP_MIN_VALUE)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, UnboundedSourceSystemTest.ssp(0), 1000L));
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(UnboundedSourceSystemTest.ssp(1), UnboundedSourceSystemTest.offset(6), "split-1", BoundedWindow.TIMESTAMP_MIN_VALUE)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, UnboundedSourceSystemTest.ssp(1), 1000L));
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(UnboundedSourceSystemTest.ssp(2), UnboundedSourceSystemTest.offset(9), "split-2", BoundedWindow.TIMESTAMP_MIN_VALUE)), UnboundedSourceSystemTest.consumeUntilTimeoutOrWatermark(consumer, UnboundedSourceSystemTest.ssp(2), 1000L));
        consumer.stop();
    }

    private static UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer(TestUnboundedSource<String> source) {
        return UnboundedSourceSystemTest.createConsumer(source, 1);
    }

    private static UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer(TestUnboundedSource<String> source, int splitNum) {
        SamzaPipelineOptions pipelineOptions = (SamzaPipelineOptions)PipelineOptionsFactory.as(SamzaPipelineOptions.class);
        pipelineOptions.setWatermarkInterval(0L);
        pipelineOptions.setMaxSourceParallelism(splitNum);
        return new UnboundedSourceSystem.Consumer(source, pipelineOptions, new SamzaMetricsContainer(new MetricsRegistryMap()), "test-step");
    }

    private static List<IncomingMessageEnvelope> consumeUntilTimeoutOrWatermark(SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
        long start;
        Assert.assertTrue((String)("Expected timeoutMillis (" + timeoutMillis + ") >= 0"), (timeoutMillis >= 0L ? 1 : 0) != 0);
        ArrayList<IncomingMessageEnvelope> accumulator = new ArrayList<IncomingMessageEnvelope>();
        long now = start = System.currentTimeMillis();
        while (timeoutMillis + start >= now) {
            accumulator.addAll(UnboundedSourceSystemTest.pollOnce(consumer, ssp, now - start - timeoutMillis));
            if (!accumulator.isEmpty() && MessageType.of((Object)((IncomingMessageEnvelope)accumulator.get(accumulator.size() - 1)).getMessage()) == MessageType.WATERMARK) break;
            now = System.currentTimeMillis();
        }
        return accumulator;
    }

    private static OpMessage.Type getMessageType(IncomingMessageEnvelope envelope) {
        return ((OpMessage)envelope.getMessage()).getType();
    }

    private static List<IncomingMessageEnvelope> pollOnce(SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
        Set<SystemStreamPartition> sspSet = Collections.singleton(ssp);
        Map pollResult = consumer.poll(sspSet, timeoutMillis);
        Assert.assertEquals(sspSet, pollResult.keySet());
        Assert.assertNotNull(pollResult.get(ssp));
        return (List)pollResult.get(ssp);
    }

    private static String offset(int offset) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        CHECKPOINT_MARK_CODER.encode((Object)TestCheckpointMark.of(offset), (OutputStream)baos);
        return Base64.getEncoder().encodeToString(baos.toByteArray());
    }

    private static SystemStreamPartition ssp(int partition) {
        return new SystemStreamPartition("default-system", "default-system", new Partition(partition));
    }
}

