/*
 * Decompiled with CFR 0.152.
 */
package co.com.sofka.business.asyn;

import co.com.sofka.domain.generic.AggregateRootId;
import co.com.sofka.domain.generic.DomainEvent;
import co.com.sofka.infraestructure.bus.ErrorEvent;
import co.com.sofka.infraestructure.bus.EventBus;
import co.com.sofka.infraestructure.repository.EventStoreRepository;
import co.com.sofka.infraestructure.store.StoredEvent;
import java.util.Optional;
import java.util.concurrent.Flow;

public class SubscriberEvent<T extends AggregateRootId>
implements Flow.Subscriber<DomainEvent> {
    private final EventStoreRepository<T> repository;
    private final EventBus eventBus;
    private Flow.Subscription subscription;

    public SubscriberEvent(EventStoreRepository<T> repository, EventBus eventBus) {
        this.repository = repository;
        this.eventBus = eventBus;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
    }

    @Override
    public final void onNext(DomainEvent event) {
        Optional.of(this.eventBus).ifPresent(bus -> bus.publish(event));
        Optional.of(this.repository).ifPresent(repo -> {
            StoredEvent storedEvent = StoredEvent.wrapEvent(event);
            repo.saveEvent(event.aggregateRootId, storedEvent);
        });
        this.subscription.request(1L);
    }

    @Override
    public void onError(Throwable throwable) {
        Optional.of(this.eventBus).ifPresent(bus -> bus.publishError(new ErrorEvent(504, "There is a problem saving the event", throwable.getMessage())));
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
    }
}

