/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.jet.processors;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;

public class UnboundedSourceP<T, CmT extends UnboundedSource.CheckpointMark>
extends AbstractProcessor {
    private UnboundedSource.UnboundedReader<T>[] readers;
    private final List<? extends UnboundedSource<T, CmT>> allShards;
    private final PipelineOptions options;
    private final Coder outputCoder;
    private final String ownerId;
    private Traverser<Object> traverser;

    private UnboundedSourceP(List<? extends UnboundedSource<T, CmT>> allShards, PipelineOptions options, Coder outputCoder, String ownerId) {
        this.allShards = allShards;
        this.options = options;
        this.outputCoder = outputCoder;
        this.ownerId = ownerId;
    }

    protected void init(@Nonnull Processor.Context context) throws IOException {
        List<UnboundedSource<T, CmT>> myShards = Utils.roundRobinSubList(this.allShards, context.globalProcessorIndex(), context.totalParallelism());
        this.readers = UnboundedSourceP.createReaders(myShards, this.options);
        Function mapFn = reader -> Utils.encode(WindowedValue.timestampedValueInGlobalWindow((Object)reader.getCurrent(), (Instant)reader.getCurrentTimestamp()), this.outputCoder);
        this.traverser = myShards.size() == 0 ? Traversers.empty() : (myShards.size() == 1 ? new SingleReaderTraverser<T>(this.readers[0], mapFn) : new CoalescingTraverser<T>(this.readers, mapFn));
        for (UnboundedSource.UnboundedReader<T> reader2 : this.readers) {
            reader2.start();
        }
    }

    public boolean complete() {
        this.emitFromTraverser(this.traverser);
        return this.readers.length == 0;
    }

    public boolean isCooperative() {
        return false;
    }

    public void close() {
        Arrays.stream(this.readers).forEach(UnboundedSourceP::stopReader);
        Arrays.fill(this.readers, null);
    }

    private static <T, CmT extends UnboundedSource.CheckpointMark> UnboundedSource.UnboundedReader<T>[] createReaders(List<? extends UnboundedSource<T, CmT>> shards, PipelineOptions options) {
        return (UnboundedSource.UnboundedReader[])shards.stream().map(shard -> UnboundedSourceP.createReader(options, shard)).toArray(UnboundedSource.UnboundedReader[]::new);
    }

    private static long[] initWatermarks(int size) {
        long[] watermarks = new long[size];
        Arrays.fill(watermarks, Long.MIN_VALUE);
        return watermarks;
    }

    private static <T> UnboundedSource.UnboundedReader<T> createReader(PipelineOptions options, UnboundedSource<T, ?> shard) {
        try {
            return shard.createReader(options, null);
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    private static void stopReader(UnboundedSource.UnboundedReader<?> reader) {
        try {
            reader.close();
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    private static long getMin(long[] instants) {
        long min = instants[0];
        for (int i = 1; i < instants.length; ++i) {
            if (instants[i] >= min) continue;
            min = instants[i];
        }
        return min;
    }

    public static <T, CmT extends UnboundedSource.CheckpointMark> ProcessorMetaSupplier supplier(UnboundedSource<T, CmT> unboundedSource, SerializablePipelineOptions options, Coder outputCoder, String ownerId) {
        return new UnboundedSourceProcessorMetaSupplier(unboundedSource, options, outputCoder, ownerId);
    }

    private static class CoalescingTraverser<InputT>
    implements Traverser<Object> {
        private final UnboundedSource.UnboundedReader<InputT>[] readers;
        private final Function<UnboundedSource.UnboundedReader<InputT>, byte[]> mapFn;
        private int currentReaderIndex;
        private long minWatermark = Long.MIN_VALUE;
        private long lastSentWatermark = Long.MIN_VALUE;
        private long[] watermarks;

        CoalescingTraverser(UnboundedSource.UnboundedReader<InputT>[] readers, Function<UnboundedSource.UnboundedReader<InputT>, byte[]> mapFn) {
            this.readers = readers;
            this.watermarks = UnboundedSourceP.initWatermarks(readers.length);
            this.mapFn = mapFn;
        }

        public Object next() {
            if (this.minWatermark > this.lastSentWatermark) {
                this.lastSentWatermark = this.minWatermark;
                return new Watermark(this.lastSentWatermark);
            }
            try {
                for (int i = 0; i < this.readers.length; ++i) {
                    long origWatermark;
                    UnboundedSource.UnboundedReader<InputT> currentReader;
                    ++this.currentReaderIndex;
                    if (this.currentReaderIndex >= this.readers.length) {
                        this.currentReaderIndex = 0;
                    }
                    if (!(currentReader = this.readers[this.currentReaderIndex]).advance()) continue;
                    long currentWatermark = currentReader.getWatermark().getMillis();
                    if (currentWatermark > (origWatermark = this.watermarks[this.currentReaderIndex])) {
                        this.watermarks[this.currentReaderIndex] = currentWatermark;
                        if (origWatermark == this.minWatermark) {
                            this.minWatermark = UnboundedSourceP.getMin(this.watermarks);
                        }
                    }
                    return this.mapFn.apply(currentReader);
                }
                return null;
            }
            catch (IOException e) {
                throw ExceptionUtil.rethrow((Throwable)e);
            }
        }
    }

    private static class SingleReaderTraverser<InputT>
    implements Traverser<Object> {
        private final UnboundedSource.UnboundedReader<InputT> reader;
        private final Function<UnboundedSource.UnboundedReader<InputT>, byte[]> mapFn;
        private long lastWatermark = Long.MIN_VALUE;

        SingleReaderTraverser(UnboundedSource.UnboundedReader<InputT> reader, Function<UnboundedSource.UnboundedReader<InputT>, byte[]> mapFn) {
            this.reader = reader;
            this.mapFn = mapFn;
        }

        public Object next() {
            long wm = this.reader.getWatermark().getMillis();
            if (wm > this.lastWatermark) {
                this.lastWatermark = wm;
                return new Watermark(wm);
            }
            try {
                return this.reader.advance() ? this.mapFn.apply(this.reader) : null;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class UnboundedSourceProcessorMetaSupplier<T, CmT extends UnboundedSource.CheckpointMark>
    implements ProcessorMetaSupplier {
        private final UnboundedSource<T, CmT> unboundedSource;
        private final SerializablePipelineOptions options;
        private final Coder outputCoder;
        private final String ownerId;
        private List<? extends UnboundedSource<T, CmT>> shards;

        private UnboundedSourceProcessorMetaSupplier(UnboundedSource<T, CmT> unboundedSource, SerializablePipelineOptions options, Coder outputCoder, String ownerId) {
            this.unboundedSource = unboundedSource;
            this.options = options;
            this.outputCoder = outputCoder;
            this.ownerId = ownerId;
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
            this.shards = this.unboundedSource.split(context.totalParallelism(), this.options.get());
        }

        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            return address -> ProcessorSupplier.of((SupplierEx & Serializable)() -> new UnboundedSourceP(this.shards, this.options.get(), this.outputCoder, this.ownerId));
        }
    }
}

