/*
 * Decompiled with CFR 0.152.
 */
package co.com.sofka.infraestructure.asyn;

import co.com.sofka.business.asyn.ListenerEvent;
import co.com.sofka.domain.generic.DomainEvent;
import co.com.sofka.infraestructure.bus.ErrorEvent;
import co.com.sofka.infraestructure.bus.EventBus;
import co.com.sofka.infraestructure.repository.EventStoreRepository;
import co.com.sofka.infraestructure.store.StoredEvent;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.logging.Logger;

public class SubscriberEvent
implements Flow.Subscriber<DomainEvent> {
    private static Logger logger = Logger.getLogger(SubscriberEvent.class.getName());
    private final EventStoreRepository repository;
    private final EventBus eventBus;
    private final ListenerEvent listenerEvent;
    private Flow.Subscription subscription;

    public SubscriberEvent(EventStoreRepository repository, EventBus eventBus, ListenerEvent listenerEvent) {
        this.repository = repository;
        this.eventBus = eventBus;
        this.listenerEvent = listenerEvent;
    }

    public SubscriberEvent(EventStoreRepository repository, EventBus eventBus) {
        this(repository, eventBus, null);
    }

    public SubscriberEvent(EventStoreRepository repository) {
        this(repository, null, null);
    }

    public SubscriberEvent() {
        this(null, null, null);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
        Optional.ofNullable(this.listenerEvent).ifPresent(listener -> listener.onSubscribe(subscription));
    }

    @Override
    public final void onNext(DomainEvent event) {
        logger.info("###### Process event -> " + event.type);
        Optional.ofNullable(this.eventBus).ifPresent(bus -> {
            bus.publish(event);
            logger.info("Event published OK");
        });
        Optional.ofNullable(this.repository).ifPresent(repo -> {
            logger.info("Saving event for aggregate root [" + event.aggregateRootId() + "]");
            StoredEvent storedEvent = StoredEvent.wrapEvent(event);
            Optional.ofNullable(event.aggregateRootId()).ifPresent(aggregateId -> {
                repo.saveEvent(aggregateId.value(), storedEvent);
                logger.info("Event saved OK");
            });
        });
        Optional.ofNullable(this.listenerEvent).ifPresent(listener -> {
            listener.setSubscriber((Flow.Subscriber)this);
            logger.info("Notify other case");
            listener.onNext(event);
        });
        this.subscription.request(1L);
    }

    @Override
    public void onError(Throwable throwable) {
        String cause = Optional.ofNullable(throwable.getCause()).map(c -> Arrays.toString(c.getStackTrace()).substring(0, 250)).orElse("");
        logger.info("Error on event ====> " + cause);
        Optional.ofNullable(this.eventBus).ifPresent(bus -> bus.publishError(new ErrorEvent(504, cause, throwable.getMessage())));
        Optional.ofNullable(this.listenerEvent).ifPresent(listener -> listener.onError(throwable));
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        Optional.ofNullable(this.listenerEvent).ifPresent(ListenerEvent::onComplete);
    }
}

