package org.occurrent.subscription.util.blocking.catchup.subscription;

import io.cloudevents.CloudEvent;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.PreDestroy;
import org.occurrent.condition.Condition;
import org.occurrent.eventstore.api.blocking.EventStoreQueries;
import org.occurrent.filter.Filter;
import org.occurrent.functionalsupport.internal.FunctionalSupport;
import org.occurrent.subscription.OccurrentSubscriptionFilter;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.StringBasedSubscriptionPosition;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.blocking.BlockingSubscription;
import org.occurrent.subscription.api.blocking.PositionAwareBlockingSubscription;
import org.occurrent.subscription.api.blocking.Subscription;
import org.occurrent.subscription.util.blocking.catchup.subscription.SubscriptionPositionStorageConfig;
import org.occurrent.time.internal.RFC3339;

/* loaded from: input_file:org/occurrent/subscription/util/blocking/catchup/subscription/CatchupSupportingBlockingSubscription.class */
public class CatchupSupportingBlockingSubscription implements BlockingSubscription {
    private static final int DEFAULT_CACHE_SIZE = 100;
    private final PositionAwareBlockingSubscription subscription;
    private final EventStoreQueries eventStoreQueries;
    private final CatchupSupportingBlockingSubscriptionConfig config;
    private final ConcurrentMap<String, Boolean> runningCatchupSubscriptions;
    private volatile boolean shuttingDown;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/occurrent/subscription/util/blocking/catchup/subscription/CatchupSupportingBlockingSubscription$CancelledSubscription.class */
    public static class CancelledSubscription implements Subscription {
        private final String subscriptionId;

        public CancelledSubscription(String str) {
            this.subscriptionId = str;
        }

        public String id() {
            return this.subscriptionId;
        }

        public void waitUntilStarted() {
        }

        public boolean waitUntilStarted(Duration duration) {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/occurrent/subscription/util/blocking/catchup/subscription/CatchupSupportingBlockingSubscription$FixedSizeCache.class */
    public static class FixedSizeCache {
        private final LinkedHashMap<String, String> cacheContent;

        FixedSizeCache(final int i) {
            this.cacheContent = new LinkedHashMap<String, String>() { // from class: org.occurrent.subscription.util.blocking.catchup.subscription.CatchupSupportingBlockingSubscription.FixedSizeCache.1
                @Override // java.util.LinkedHashMap
                protected boolean removeEldestEntry(Map.Entry<String, String> entry) {
                    return size() > i;
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void put(String str) {
            this.cacheContent.put(str, null);
        }

        public boolean isCached(String str) {
            return this.cacheContent.containsKey(str);
        }
    }

    public CatchupSupportingBlockingSubscription(PositionAwareBlockingSubscription positionAwareBlockingSubscription, EventStoreQueries eventStoreQueries) {
        this(positionAwareBlockingSubscription, eventStoreQueries, new CatchupSupportingBlockingSubscriptionConfig(DEFAULT_CACHE_SIZE));
    }

    public CatchupSupportingBlockingSubscription(PositionAwareBlockingSubscription positionAwareBlockingSubscription, EventStoreQueries eventStoreQueries, CatchupSupportingBlockingSubscriptionConfig catchupSupportingBlockingSubscriptionConfig) {
        this.runningCatchupSubscriptions = new ConcurrentHashMap();
        this.shuttingDown = false;
        this.subscription = positionAwareBlockingSubscription;
        this.eventStoreQueries = eventStoreQueries;
        this.config = catchupSupportingBlockingSubscriptionConfig;
    }

    public Subscription subscribe(String str, SubscriptionFilter subscriptionFilter, Supplier<StartAt> supplier, Consumer<CloudEvent> consumer) {
        boolean z;
        Subscription subscribe;
        if (subscriptionFilter != null && !(subscriptionFilter instanceof OccurrentSubscriptionFilter)) {
            throw new IllegalArgumentException("Unsupported!");
        }
        this.runningCatchupSubscriptions.put(str, true);
        StartAt subscriptionPosition = (supplier == null || supplier.get() == null) ? StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()) : supplier.get();
        if (!isTimeBasedSubscriptionPosition(subscriptionPosition)) {
            return this.subscription.subscribe(str, subscriptionFilter, supplier, consumer);
        }
        SubscriptionPosition subscriptionPosition2 = ((StartAt.StartAtSubscriptionPosition) subscriptionPosition).subscriptionPosition;
        Filter all = isBeginningOfTime(subscriptionPosition2) ? Filter.all() : Filter.time(Condition.gt(OffsetDateTime.parse(subscriptionPosition2.asString(), RFC3339.RFC_3339_DATE_TIME_FORMATTER)));
        SubscriptionPosition globalSubscriptionPosition = this.subscription.globalSubscriptionPosition();
        FixedSizeCache fixedSizeCache = new FixedSizeCache(this.config.cacheSize);
        FunctionalSupport.takeWhile(subscriptionFilter == null ? this.eventStoreQueries.query(all, EventStoreQueries.SortBy.TIME_ASC) : this.eventStoreQueries.query(all.and(((OccurrentSubscriptionFilter) subscriptionFilter).filter, new Filter[0]), EventStoreQueries.SortBy.TIME_ASC), cloudEvent -> {
            return !this.shuttingDown && this.runningCatchupSubscriptions.containsKey(str);
        }).peek(consumer).peek(cloudEvent2 -> {
            fixedSizeCache.put(cloudEvent2.getId());
        }).filter((Predicate) returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.PersistSubscriptionPositionDuringCatchupPhase.class, persistSubscriptionPositionDuringCatchupPhase -> {
            return persistSubscriptionPositionDuringCatchupPhase.persistCloudEventPositionPredicate;
        }).orElse(cloudEvent3 -> {
            return false;
        })).forEach(cloudEvent4 -> {
            doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.PersistSubscriptionPositionDuringCatchupPhase.class, persistSubscriptionPositionDuringCatchupPhase2 -> {
                persistSubscriptionPositionDuringCatchupPhase2.storage.save(str, TimeBasedSubscriptionPosition.from(cloudEvent4.getTime()));
            });
        });
        if (this.shuttingDown || !this.runningCatchupSubscriptions.containsKey(str)) {
            z = true;
        } else {
            z = false;
            this.runningCatchupSubscriptions.remove(str);
        }
        Supplier supplier2 = (Supplier) returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, useSubscriptionPositionInStorage -> {
            return () -> {
                SubscriptionPosition read = useSubscriptionPositionInStorage.storage.read(str);
                if (read == null || isTimeBasedSubscriptionPosition(read)) {
                    read = useSubscriptionPositionInStorage.storage.save(str, globalSubscriptionPosition);
                }
                return StartAt.subscriptionPosition(read);
            };
        }).orElse(() -> {
            return StartAt.subscriptionPosition(globalSubscriptionPosition);
        });
        if (z) {
            doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, useSubscriptionPositionInStorage2 -> {
                if (useSubscriptionPositionInStorage2.storage.exists(str)) {
                    return;
                }
                supplier2.get();
            });
            subscribe = new CancelledSubscription(str);
        } else {
            subscribe = this.subscription.subscribe(str, subscriptionFilter, supplier2, cloudEvent5 -> {
                if (fixedSizeCache.isCached(cloudEvent5.getId())) {
                    return;
                }
                consumer.accept(cloudEvent5);
            });
        }
        return subscribe;
    }

    public void cancelSubscription(String str) {
        this.runningCatchupSubscriptions.remove(str);
        this.subscription.cancelSubscription(str);
        doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, useSubscriptionPositionInStorage -> {
            useSubscriptionPositionInStorage.storage.delete(str);
        });
    }

    public Subscription subscribe(String str, Consumer<CloudEvent> consumer) {
        return subscribe(str, (SubscriptionFilter) null, consumer);
    }

    public Subscription subscribe(String str, SubscriptionFilter subscriptionFilter, Consumer<CloudEvent> consumer) {
        return subscribe(str, subscriptionFilter, () -> {
            SubscriptionPosition subscriptionPosition = (SubscriptionPosition) returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, useSubscriptionPositionInStorage -> {
                return useSubscriptionPositionInStorage.storage.read(str);
            }).orElse(null);
            if (subscriptionPosition == null) {
                return null;
            }
            return StartAt.subscriptionPosition(subscriptionPosition);
        }, consumer);
    }

    @PreDestroy
    public void shutdown() {
        this.shuttingDown = true;
        this.runningCatchupSubscriptions.clear();
        this.subscription.shutdown();
    }

    public static boolean isTimeBasedSubscriptionPosition(StartAt startAt) {
        if (startAt instanceof StartAt.StartAtSubscriptionPosition) {
            return isTimeBasedSubscriptionPosition(((StartAt.StartAtSubscriptionPosition) startAt).subscriptionPosition);
        }
        return false;
    }

    public static boolean isTimeBasedSubscriptionPosition(SubscriptionPosition subscriptionPosition) {
        return (subscriptionPosition instanceof TimeBasedSubscriptionPosition) || ((subscriptionPosition instanceof StringBasedSubscriptionPosition) && isRfc3339Timestamp(subscriptionPosition.asString()));
    }

    private static boolean isRfc3339Timestamp(String str) {
        try {
            OffsetDateTime.parse(str, RFC3339.RFC_3339_DATE_TIME_FORMATTER);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    private static boolean isBeginningOfTime(SubscriptionPosition subscriptionPosition) {
        return (subscriptionPosition instanceof TimeBasedSubscriptionPosition) && ((TimeBasedSubscriptionPosition) subscriptionPosition).isBeginningOfTime();
    }

    private <T, C extends SubscriptionPositionStorageConfig> Optional<T> returnIfSubscriptionPositionStorageConfigIs(Class<C> cls, Function<C, T> function) {
        return cls.isInstance(this.config.subscriptionStorageConfig) ? Optional.ofNullable(function.apply(cls.cast(this.config.subscriptionStorageConfig))) : Optional.empty();
    }

    private <T, C extends SubscriptionPositionStorageConfig> void doIfSubscriptionPositionStorageConfigIs(Class<C> cls, Consumer<C> consumer) {
        if (cls.isInstance(this.config.subscriptionStorageConfig)) {
            consumer.accept(cls.cast(this.config.subscriptionStorageConfig));
        }
    }
}
