/*
 * Decompiled with CFR 0.152.
 */
package pw.aru.libs.eventpipes.internal;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import pw.aru.libs.eventpipes.api.EventConsumer;
import pw.aru.libs.eventpipes.api.EventExecutor;
import pw.aru.libs.eventpipes.api.EventPipe;
import pw.aru.libs.eventpipes.api.EventPublisher;
import pw.aru.libs.eventpipes.api.EventSubscriber;
import pw.aru.libs.eventpipes.api.EventSubscription;
import pw.aru.libs.eventpipes.internal.Wrapper;

public class DefaultEventPipe<T>
implements EventPipe<T> {
    private final EventExecutor executor;
    private final Set<EventConsumer<T>> consumers;

    public DefaultEventPipe(EventExecutor executor) {
        this.executor = executor;
        this.consumers = Collections.newSetFromMap(new ConcurrentHashMap());
    }

    @Override
    public CompletableFuture<Void> publish(T event) {
        return CompletableFuture.allOf((CompletableFuture[])this.consumers.stream().map(consumer -> this.onExecute(event, (EventConsumer<T>)consumer)).toArray(CompletableFuture[]::new));
    }

    @Override
    public EventSubscription<T> subscribe(EventConsumer<T> consumer) {
        return new Subscription(consumer);
    }

    @Override
    public CompletableFuture<T> first(Predicate<T> predicate) {
        return new FirstConsumer(predicate).first;
    }

    @Override
    public void close() {
        this.consumers.clear();
        this.onEmpty();
    }

    private void unsubscribe(EventConsumer<T> consumer) {
        this.consumers.remove(consumer);
        if (this.consumers.isEmpty()) {
            this.onEmpty();
        }
    }

    protected CompletableFuture<?> onExecute(T event, EventConsumer<T> consumer) {
        return this.executor.execute(new EventRunnable(event, consumer));
    }

    protected void onEmpty() {
    }

    @Override
    public EventSubscriber<T> subscriber() {
        return Wrapper.wrapSubscriber(this);
    }

    @Override
    public EventPublisher<T> publisher() {
        return Wrapper.wrapPublisher(this);
    }

    class EventRunnable
    implements Runnable {
        private final T event;
        private final EventConsumer<T> consumer;

        EventRunnable(T event, EventConsumer<T> consumer) {
            this.event = event;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            this.consumer.onEvent(this.event);
        }
    }

    class FirstConsumer
    implements EventConsumer<T> {
        private final Predicate<T> predicate;
        private final CompletableFuture<T> first = new CompletableFuture();

        FirstConsumer(Predicate<T> predicate) {
            this.predicate = Objects.requireNonNull(predicate);
            DefaultEventPipe.this.consumers.add(this);
        }

        @Override
        public void onEvent(T event) {
            if (this.predicate.test(event)) {
                DefaultEventPipe.this.unsubscribe(this);
                this.first.complete(event);
            }
        }
    }

    class Subscription
    implements EventSubscription<T>,
    EventConsumer<T> {
        private final EventConsumer<T> consumer;

        Subscription(EventConsumer<T> consumer) {
            this.consumer = Objects.requireNonNull(consumer);
            DefaultEventPipe.this.consumers.add(this);
        }

        @Override
        public void onEvent(T event) {
            this.consumer.onEvent(event);
        }

        @Override
        public void close() {
            DefaultEventPipe.this.unsubscribe(this);
        }

        @Override
        public EventPipe<T> pipe() {
            return DefaultEventPipe.this;
        }
    }
}

