/*
 * Decompiled with CFR 0.152.
 */
package com.littlesaints.protean.functions.streams;

import com.littlesaints.protean.functions.streams.StreamSource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ForkJoin<T, Q>
implements Consumer<T>,
AutoCloseable {
    private static final Logger log = LogManager.getLogger(ForkJoin.class);
    private final String name = String.valueOf(System.currentTimeMillis());
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final Supplier<Q> exchangeProvider;
    private final Function<Q, Supplier<T>> exchangeReaderProvider;
    private final Function<Q, Consumer<T>> exchangeWriterProvider;
    private final Map<Predicate<T>, Consumer<T>> predicates = new LinkedHashMap<Predicate<T>, Consumer<T>>(1);
    private final Collection<Future<?>> forks = new ArrayList(2);
    private final Collection<Stream<T>> streams = new ArrayList<Stream<T>>(2);
    private final Consumer<T> acceptAction;

    private ForkJoin(Supplier<Q> exchangeProvider, Function<Q, Supplier<T>> exchangeReaderProvider, Function<Q, Consumer<T>> exchangeWriterProvider) {
        this.exchangeProvider = exchangeProvider;
        this.exchangeReaderProvider = exchangeReaderProvider;
        this.exchangeWriterProvider = exchangeWriterProvider;
        this.acceptAction = t -> ((Stream)this.predicates.entrySet().stream().sequential()).filter(e -> ((Predicate)e.getKey()).test(t)).map(Map.Entry::getValue).forEach(c -> c.accept(t));
    }

    public static <T> ForkJoin<T, BlockingQueue<T>> newInstance() {
        return ForkJoin.of(LinkedTransferQueue::new);
    }

    public static <T> ForkJoin<T, BlockingQueue<T>> of(Supplier<BlockingQueue<T>> exchangeProvider) {
        Function<BlockingQueue, Supplier> reader = q -> () -> {
            while (true) {
                try {
                    return q.take();
                }
                catch (InterruptedException e) {
                    log.warn("Error during thread sleep.", (Throwable)e);
                    continue;
                }
                break;
            }
        };
        Function<BlockingQueue, Consumer> writer = q -> t -> {
            while (true) {
                try {
                    q.put(t);
                }
                catch (InterruptedException e) {
                    log.warn("Error during thread sleep.", (Throwable)e);
                    continue;
                }
                break;
            }
        };
        return ForkJoin.of(exchangeProvider, reader, writer);
    }

    private static <T, Q> ForkJoin<T, Q> of(Supplier<Q> exchangeProvider, Function<Q, Supplier<T>> exchangeReaderProvider, Function<Q, Consumer<T>> exchangeWriterProvider) {
        return new ForkJoin<T, Q>(exchangeProvider, exchangeReaderProvider, exchangeWriterProvider);
    }

    public ForkJoin<T, Q> fork(Predicate<T> predicate, Consumer<Stream<T>> streamProcessor) {
        Q messageExchange = this.exchangeProvider.get();
        Object stream = StreamSource.builder().provider(this.exchangeReaderProvider.apply(messageExchange)).build().get();
        this.streams.add((Stream<T>)stream);
        this.predicates.put(predicate, this.exchangeWriterProvider.apply(messageExchange));
        this.forks.add(this.executor.submit(() -> ForkJoin.lambda$fork$7(streamProcessor, (Stream)stream)));
        return this;
    }

    public ForkJoin<T, Q> fork(Consumer<Stream<T>> streamProcessor) {
        this.fork(t -> true, streamProcessor);
        return this;
    }

    @Override
    public void accept(T t) {
        this.acceptAction.accept(t);
    }

    @Override
    public void close() {
        this.streams.forEach(BaseStream::close);
        this.forks.forEach(f -> f.cancel(true));
    }

    public String getName() {
        return this.name;
    }

    private static /* synthetic */ void lambda$fork$7(Consumer streamProcessor, Stream stream) {
        streamProcessor.accept(stream);
    }
}

