/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.jet.processors;

import com.hazelcast.cluster.Address;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;

public class BoundedSourceP<T>
extends AbstractProcessor
implements Traverser {
    private final Traverser<BoundedSource<T>> shardsTraverser;
    private final PipelineOptions options;
    private final Coder outputCoder;
    private final String ownerId;
    private BoundedSource.BoundedReader currentReader;

    BoundedSourceP(List<BoundedSource<T>> shards, PipelineOptions options, Coder outputCoder, String ownerId) {
        this.shardsTraverser = Traversers.traverseIterable(shards);
        this.options = options;
        this.outputCoder = outputCoder;
        this.ownerId = ownerId;
    }

    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.nextShard();
    }

    public Object next() {
        if (this.currentReader == null) {
            return null;
        }
        try {
            Object item = this.currentReader.getCurrent();
            WindowedValue res = WindowedValue.timestampedValueInGlobalWindow((Object)item, (Instant)this.currentReader.getCurrentTimestamp());
            if (!this.currentReader.advance()) {
                this.nextShard();
            }
            return this.outputCoder == null ? res : (WindowedValue)Utils.encode(res, this.outputCoder);
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    private void nextShard() throws IOException {
        do {
            BoundedSource shard;
            if (this.currentReader != null) {
                this.currentReader.close();
                this.currentReader = null;
            }
            if ((shard = (BoundedSource)this.shardsTraverser.next()) == null) break;
            this.currentReader = shard.createReader(this.options);
        } while (!this.currentReader.start());
    }

    public boolean complete() {
        return this.emitFromTraverser(this);
    }

    public boolean isCooperative() {
        return false;
    }

    public void close() throws Exception {
        if (this.currentReader != null) {
            this.currentReader.close();
        }
    }

    public static <T> ProcessorMetaSupplier supplier(BoundedSource<T> boundedSource, SerializablePipelineOptions options, Coder outputCoder, String ownerId) {
        return new BoundedSourceMetaProcessorSupplier(boundedSource, options, outputCoder, ownerId);
    }

    private static class BoundedSourceProcessorSupplier<T>
    implements ProcessorSupplier {
        private final List<BoundedSource<T>> shards;
        private final SerializablePipelineOptions options;
        private final Coder outputCoder;
        private final String ownerId;
        private transient ProcessorSupplier.Context context;

        private BoundedSourceProcessorSupplier(List<BoundedSource<T>> shards, SerializablePipelineOptions options, Coder outputCoder, String ownerId) {
            this.shards = shards;
            this.options = options;
            this.outputCoder = outputCoder;
            this.ownerId = ownerId;
        }

        public void init(@Nonnull ProcessorSupplier.Context context) {
            this.context = context;
        }

        @Nonnull
        public Collection<? extends Processor> get(int count) {
            int indexBase = this.context.memberIndex() * this.context.localParallelism();
            ArrayList<BoundedSourceP<T>> res = new ArrayList<BoundedSourceP<T>>(count);
            int i = 0;
            while (i < count) {
                res.add(new BoundedSourceP<T>(Utils.roundRobinSubList(this.shards, i, count), this.options.get(), this.outputCoder, this.ownerId));
                ++i;
                ++indexBase;
            }
            return res;
        }
    }

    private static class BoundedSourceMetaProcessorSupplier<T>
    implements ProcessorMetaSupplier {
        private final BoundedSource<T> boundedSource;
        private final SerializablePipelineOptions options;
        private final Coder outputCoder;
        private final String ownerId;
        private transient List<? extends BoundedSource<T>> shards;

        private BoundedSourceMetaProcessorSupplier(BoundedSource<T> boundedSource, SerializablePipelineOptions options, Coder outputCoder, String ownerId) {
            this.boundedSource = boundedSource;
            this.options = options;
            this.outputCoder = outputCoder;
            this.ownerId = ownerId;
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
            long desiredSizeBytes = Math.max(1L, this.boundedSource.getEstimatedSizeBytes(this.options.get()) / (long)context.totalParallelism());
            this.shards = this.boundedSource.split(desiredSizeBytes, this.options.get());
        }

        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            return address -> new BoundedSourceProcessorSupplier(Utils.roundRobinSubList(this.shards, addresses.indexOf(address), addresses.size()), this.options, this.outputCoder, this.ownerId);
        }
    }
}

