/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class UnboundedReadFromBoundedSourceTest {
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @Test
    public void testCheckpointCoderNulls() throws Exception {
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder coder = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder((Coder)StringUtf8Coder.of());
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint emptyCheckpoint = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint(null, null);
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint decodedEmptyCheckpoint = (UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint)CoderUtils.decodeFromByteArray((Coder)coder, (byte[])CoderUtils.encodeToByteArray((Coder)coder, (Object)emptyCheckpoint));
        Assert.assertNull((Object)decodedEmptyCheckpoint.getResidualElements());
        Assert.assertNull((Object)decodedEmptyCheckpoint.getResidualSource());
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testBoundedToUnboundedSourceAdapter() throws Exception {
        long numElements = 100L;
        BoundedSource boundedSource = CountingSource.upTo((long)numElements);
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter unboundedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(boundedSource);
        TestPipeline p = TestPipeline.create();
        PCollection output = (PCollection)p.apply((PTransform)Read.from((UnboundedSource)unboundedSource).withMaxNumRecords(numElements));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", (PTransform)Count.globally()))).isEqualTo((Object)numElements);
        PAssert.thatSingleton((PCollection)((PCollection)((PCollection)output.apply((PTransform)RemoveDuplicates.create())).apply("UniqueCount", (PTransform)Count.globally()))).isEqualTo((Object)numElements);
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Min", (PTransform)Min.globally()))).isEqualTo((Object)0L);
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Max", (PTransform)Max.globally()))).isEqualTo((Object)(numElements - 1L));
        p.run();
    }

    @Test
    public void testCountingSourceToUnboundedCheckpoint() throws Exception {
        long numElements = 100L;
        BoundedSource countingSource = CountingSource.upTo((long)numElements);
        ArrayList expected = Lists.newArrayList();
        for (long i = 0L; i < numElements; ++i) {
            expected.add(i);
        }
        this.testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected);
    }

    @Test
    public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception {
        String baseName = "test-input";
        File compressedFile = this.tmpFolder.newFile(baseName + ".gz");
        byte[] input = UnboundedReadFromBoundedSourceTest.generateInput(100);
        UnboundedReadFromBoundedSourceTest.writeFile(compressedFile, input);
        UnsplittableSource source = new UnsplittableSource(compressedFile.getPath(), 1L);
        ArrayList expected = Lists.newArrayList();
        for (byte i : input) {
            expected.add(i);
        }
        this.testBoundedToUnboundedSourceAdapterCheckpoint((BoundedSource)source, expected);
    }

    private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(BoundedSource<T> boundedSource, List<T> expectedElements) throws Exception {
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter unboundedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(boundedSource);
        PipelineOptions options = PipelineOptionsFactory.create();
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Reader reader = unboundedSource.createReader(options, null);
        ArrayList actual = Lists.newArrayList();
        boolean hasNext = reader.start();
        while (hasNext) {
            actual.add(reader.getCurrent());
            if (actual.size() % 9 == 0) {
                UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpoint = reader.getCheckpointMark();
                checkpoint.finalizeCheckpoint();
            }
            hasNext = reader.advance();
        }
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpointDone = reader.getCheckpointMark();
        Assert.assertTrue((checkpointDone.getResidualElements() == null || checkpointDone.getResidualElements().isEmpty() ? 1 : 0) != 0);
        Assert.assertEquals((long)expectedElements.size(), (long)actual.size());
        Assert.assertEquals((Object)Sets.newHashSet(expectedElements), (Object)Sets.newHashSet((Iterable)actual));
    }

    @Test
    public void testCountingSourceToUnboundedCheckpointRestart() throws Exception {
        long numElements = 100L;
        BoundedSource countingSource = CountingSource.upTo((long)numElements);
        ArrayList expected = Lists.newArrayList();
        for (long i = 0L; i < numElements; ++i) {
            expected.add(i);
        }
        this.testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected);
    }

    @Test
    public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception {
        String baseName = "test-input";
        File compressedFile = this.tmpFolder.newFile(baseName + ".gz");
        byte[] input = UnboundedReadFromBoundedSourceTest.generateInput(1000);
        UnboundedReadFromBoundedSourceTest.writeFile(compressedFile, input);
        UnsplittableSource source = new UnsplittableSource(compressedFile.getPath(), 1L);
        ArrayList expected = Lists.newArrayList();
        for (byte i : input) {
            expected.add(i);
        }
        this.testBoundedToUnboundedSourceAdapterCheckpointRestart((BoundedSource)source, expected);
    }

    private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart(BoundedSource<T> boundedSource, List<T> expectedElements) throws Exception {
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter unboundedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(boundedSource);
        PipelineOptions options = PipelineOptionsFactory.create();
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Reader reader = unboundedSource.createReader(options, null);
        ArrayList actual = Lists.newArrayList();
        boolean hasNext = reader.start();
        while (hasNext) {
            actual.add(reader.getCurrent());
            if (actual.size() % 9 == 0) {
                UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Reader restarted;
                UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpoint = reader.getCheckpointMark();
                Coder checkpointCoder = unboundedSource.getCheckpointMarkCoder();
                UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint decodedCheckpoint = (UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint)CoderUtils.decodeFromByteArray((Coder)checkpointCoder, (byte[])CoderUtils.encodeToByteArray((Coder)checkpointCoder, (Object)checkpoint));
                reader.close();
                checkpoint.finalizeCheckpoint();
                reader = restarted = unboundedSource.createReader(options, decodedCheckpoint);
                hasNext = reader.start();
                continue;
            }
            hasNext = reader.advance();
        }
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpointDone = reader.getCheckpointMark();
        Assert.assertTrue((checkpointDone.getResidualElements() == null || checkpointDone.getResidualElements().isEmpty() ? 1 : 0) != 0);
        Assert.assertEquals((long)expectedElements.size(), (long)actual.size());
        Assert.assertEquals((Object)Sets.newHashSet(expectedElements), (Object)Sets.newHashSet((Iterable)actual));
    }

    @Test
    public void testReadBeforeStart() throws Exception {
        this.thrown.expect(NoSuchElementException.class);
        BoundedSource countingSource = CountingSource.upTo((long)100L);
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter unboundedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(countingSource);
        PipelineOptions options = PipelineOptionsFactory.create();
        unboundedSource.createReader(options, null).getCurrent();
    }

    @Test
    public void testReadFromCheckpointBeforeStart() throws Exception {
        this.thrown.expect(NoSuchElementException.class);
        BoundedSource countingSource = CountingSource.upTo((long)100L);
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter unboundedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(countingSource);
        PipelineOptions options = PipelineOptionsFactory.create();
        ImmutableList elements = ImmutableList.of((Object)TimestampedValue.of((Object)1L, (Instant)new Instant(1L)));
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint checkpoint = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint((List)elements, countingSource);
        unboundedSource.createReader(options, checkpoint).getCurrent();
    }

    private static byte[] generateInput(int size) {
        Random random = new Random(285930L);
        byte[] buff = new byte[size];
        random.nextBytes(buff);
        return buff;
    }

    private static void writeFile(File file, byte[] input) throws IOException {
        try (FileOutputStream os = new FileOutputStream(file);){
            ((OutputStream)os).write(input);
        }
    }

    private static class UnsplittableSource
    extends FileBasedSource<Byte> {
        public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
            super(fileOrPatternSpec, minBundleSize);
        }

        public UnsplittableSource(String fileName, long minBundleSize, long startOffset, long endOffset) {
            super(fileName, minBundleSize, startOffset, endOffset);
        }

        protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
            return new UnsplittableSource(fileName, this.getMinBundleSize(), start, end);
        }

        protected FileBasedSource.FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
            return new UnsplittableReader(this);
        }

        public boolean producesSortedKeys(PipelineOptions options) throws Exception {
            return false;
        }

        public Coder<Byte> getDefaultOutputCoder() {
            return SerializableCoder.of(Byte.class);
        }

        private static class UnsplittableReader
        extends FileBasedSource.FileBasedReader<Byte> {
            ByteBuffer buff = ByteBuffer.allocate(1);
            Byte current;
            long offset;
            ReadableByteChannel channel;

            public UnsplittableReader(UnsplittableSource source) {
                super((FileBasedSource)source);
                this.offset = source.getStartOffset() - 1L;
            }

            public Byte getCurrent() throws NoSuchElementException {
                return this.current;
            }

            public boolean allowsDynamicSplitting() {
                return false;
            }

            protected boolean isAtSplitPoint() {
                return true;
            }

            protected void startReading(ReadableByteChannel channel) throws IOException {
                this.channel = channel;
            }

            protected boolean readNextRecord() throws IOException {
                this.buff.clear();
                if (this.channel.read(this.buff) != 1) {
                    return false;
                }
                this.current = this.buff.get(0);
                ++this.offset;
                return true;
            }

            protected long getCurrentOffset() {
                return this.offset;
            }
        }
    }
}

