/*
 * Decompiled with CFR 0.152.
 */
package co.com.sofka.infraestructure.asyn;

import co.com.sofka.business.asyn.ListenerEvent;
import co.com.sofka.domain.generic.DomainEvent;
import co.com.sofka.domain.generic.Identity;
import co.com.sofka.infraestructure.bus.ErrorEvent;
import co.com.sofka.infraestructure.bus.EventBus;
import co.com.sofka.infraestructure.repository.EventStoreRepository;
import co.com.sofka.infraestructure.store.StoredEvent;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Flow;

public class SubscriberEvent<T extends Identity>
implements Flow.Subscriber<DomainEvent> {
    private final EventStoreRepository<T> repository;
    private final EventBus eventBus;
    private final ListenerEvent listenerEvent;
    private Flow.Subscription subscription;

    public SubscriberEvent(EventStoreRepository<T> repository, EventBus eventBus, ListenerEvent listenerEvent) {
        this.repository = repository;
        this.eventBus = eventBus;
        this.listenerEvent = listenerEvent;
    }

    public SubscriberEvent(EventStoreRepository<T> repository, EventBus eventBus) {
        this(repository, eventBus, null);
    }

    public SubscriberEvent(EventStoreRepository<T> repository) {
        this(repository, null, null);
    }

    public SubscriberEvent() {
        this(null, null, null);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
        Optional.ofNullable(this.listenerEvent).ifPresent(listener -> listener.onSubscribe(subscription));
    }

    @Override
    public final void onNext(DomainEvent event) {
        Optional.ofNullable(this.eventBus).ifPresent(bus -> bus.publish(event));
        Optional.ofNullable(this.repository).ifPresent(repo -> {
            StoredEvent storedEvent = StoredEvent.wrapEvent(event);
            repo.saveEvent(event.aggregateRootId(), storedEvent);
        });
        Optional.ofNullable(this.listenerEvent).ifPresent(listener -> {
            listener.setSubscriber((Flow.Subscriber)this);
            listener.onNext(event);
        });
        this.subscription.request(1L);
    }

    @Override
    public void onError(Throwable throwable) {
        String cause = Optional.ofNullable(throwable.getCause()).map(c -> Arrays.toString(c.getStackTrace()).substring(0, 250)).orElse("");
        Optional.ofNullable(this.eventBus).ifPresent(bus -> bus.publishError(new ErrorEvent(504, cause, throwable.getMessage())));
        Optional.ofNullable(this.listenerEvent).ifPresent(listener -> listener.onError(throwable));
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        Optional.ofNullable(this.listenerEvent).ifPresent(ListenerEvent::onComplete);
    }
}

