/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.adapter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.runners.samza.adapter.TestCheckpointMark;
import org.apache.beam.runners.samza.adapter.TestSourceHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;

public class TestUnboundedSource<T>
extends UnboundedSource<T, TestCheckpointMark> {
    private final List<List<TestSourceHelpers.Event<T>>> events;

    public static <T> Builder<T> createBuilder() {
        return new Builder();
    }

    public static <T> SplittableBuilder<T> createSplits(int numSplits) {
        return new SplittableBuilder(numSplits);
    }

    private TestUnboundedSource(List<List<TestSourceHelpers.Event<T>>> events) {
        this.events = Collections.unmodifiableList(new ArrayList<List<TestSourceHelpers.Event<T>>>(events));
    }

    public List<? extends UnboundedSource<T, TestCheckpointMark>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
        return this.events.stream().map(ev -> new TestUnboundedSource(Collections.singletonList(ev))).collect(Collectors.toList());
    }

    public UnboundedSource.UnboundedReader<T> createReader(PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) throws IOException {
        assert (this.events.size() == 1);
        return new Reader(this.events.get(0), checkpointMark);
    }

    public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
        return SerializableCoder.of(TestCheckpointMark.class);
    }

    public void validate() {
    }

    private class Reader
    extends UnboundedSource.UnboundedReader<T> {
        private final List<TestSourceHelpers.Event<T>> events;
        private Instant curTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
        private Instant watermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        private boolean started;
        private int index = -1;
        private int offset;

        private Reader(List<TestSourceHelpers.Event<T>> events, TestCheckpointMark checkpointMark) {
            this.events = events;
            this.offset = checkpointMark == null ? -1 : checkpointMark.checkpoint;
        }

        public boolean start() throws IOException {
            if (this.started) {
                throw new IllegalStateException("Start called when reader was already started");
            }
            this.started = true;
            return this.advance();
        }

        public boolean advance() throws IOException {
            if (!this.started) {
                throw new IllegalStateException("Advance called when reader was not started");
            }
            ++this.index;
            while (this.index < this.events.size()) {
                TestSourceHelpers.Event event = this.events.get(this.index);
                if (event instanceof TestSourceHelpers.ExceptionEvent) {
                    throw ((TestSourceHelpers.ExceptionEvent)event).exception;
                }
                if (event instanceof TestSourceHelpers.LatchEvent) {
                    try {
                        ((TestSourceHelpers.LatchEvent)event).latch.await();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else if (event instanceof TestSourceHelpers.WatermarkEvent) {
                    this.watermark = ((TestSourceHelpers.WatermarkEvent)event).watermark;
                } else {
                    if (event instanceof TestSourceHelpers.NoElementEvent) {
                        return false;
                    }
                    this.curTime = ((TestSourceHelpers.ElementEvent)event).timestamp;
                    ++this.offset;
                    return true;
                }
                ++this.index;
            }
            return false;
        }

        public T getCurrent() throws NoSuchElementException {
            if (!this.started) {
                throw new NoSuchElementException();
            }
            TestSourceHelpers.Event event = this.events.get(this.index);
            assert (event instanceof TestSourceHelpers.ElementEvent);
            return ((TestSourceHelpers.ElementEvent)event).element;
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.curTime;
        }

        public Instant getWatermark() {
            return this.watermark;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return TestCheckpointMark.of(this.offset);
        }

        public void close() throws IOException {
        }

        public UnboundedSource<T, ?> getCurrentSource() {
            return TestUnboundedSource.this;
        }
    }

    public static class SplittableBuilder<T>
    extends TestSourceHelpers.SourceBuilder<T, TestUnboundedSource<T>> {
        private final List<Builder<T>> builders = new ArrayList<Builder<T>>();

        private SplittableBuilder(int splits) {
            while (splits != 0) {
                this.builders.add(new Builder());
                --splits;
            }
        }

        @Override
        public TestUnboundedSource<T> build() {
            ArrayList events = new ArrayList();
            this.builders.forEach(builder -> events.add(builder.getEvents()));
            return new TestUnboundedSource(events);
        }

        public Builder<T> forSplit(int split) {
            return this.builders.get(split);
        }
    }

    public static class Builder<T>
    extends TestSourceHelpers.SourceBuilder<T, TestUnboundedSource<T>> {
        @Override
        public TestUnboundedSource<T> build() {
            return new TestUnboundedSource(Collections.singletonList(this.getEvents()));
        }
    }
}

