package org.occurrent.subscription.mongodb.spring.blocking;

import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.occurrent.retry.RetryStrategy;
import org.occurrent.retry.internal.RetryExecution;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.blocking.SubscriptionPositionStorage;
import org.occurrent.subscription.mongodb.MongoOperationTimeSubscriptionPosition;
import org.occurrent.subscription.mongodb.MongoResumeTokenSubscriptionPosition;
import org.occurrent.subscription.mongodb.internal.MongoCommons;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;

/* loaded from: input_file:org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionPositionStorage.class */
public class SpringMongoSubscriptionPositionStorage implements SubscriptionPositionStorage {
    private static final Logger log = LoggerFactory.getLogger(SpringMongoSubscriptionPositionStorage.class);
    private final MongoOperations mongoOperations;
    private final String subscriptionPositionCollection;
    private final RetryStrategy retryStrategy;
    private volatile boolean shutdown;

    public SpringMongoSubscriptionPositionStorage(MongoOperations mongoOperations, String str) {
        this(mongoOperations, str, RetryStrategy.exponentialBackoff(Duration.ofMillis(100L), Duration.ofSeconds(2L), 2.0d));
    }

    public SpringMongoSubscriptionPositionStorage(MongoOperations mongoOperations, String str, RetryStrategy retryStrategy) {
        this.shutdown = false;
        Objects.requireNonNull(mongoOperations, "Mongo operations cannot be null");
        Objects.requireNonNull(str, "subscriptionPositionCollection cannot be null");
        Objects.requireNonNull(retryStrategy, RetryStrategy.class.getSimpleName() + " cannot be null");
        this.mongoOperations = mongoOperations;
        this.subscriptionPositionCollection = str;
        this.retryStrategy = retryStrategy;
    }

    public SubscriptionPosition read(String str) {
        return (SubscriptionPosition) RetryExecution.executeWithRetry(() -> {
            Document document = (Document) this.mongoOperations.findOne(Query.query(Criteria.where("_id").is(str)), Document.class, this.subscriptionPositionCollection);
            if (document == null) {
                return null;
            }
            return MongoCommons.calculateSubscriptionPositionFromMongoStreamPositionDocument(document);
        }, th -> {
            return !this.shutdown;
        }, this.retryStrategy).get();
    }

    public SubscriptionPosition save(String str, SubscriptionPosition subscriptionPosition) {
        return (SubscriptionPosition) RetryExecution.executeWithRetry(() -> {
            if (subscriptionPosition instanceof MongoResumeTokenSubscriptionPosition) {
                persistResumeTokenStreamPosition(str, ((MongoResumeTokenSubscriptionPosition) subscriptionPosition).resumeToken);
            } else if (subscriptionPosition instanceof MongoOperationTimeSubscriptionPosition) {
                persistOperationTimeStreamPosition(str, ((MongoOperationTimeSubscriptionPosition) subscriptionPosition).operationTime);
            } else {
                persistDocumentStreamPosition(str, MongoCommons.generateGenericSubscriptionPositionDocument(str, subscriptionPosition.asString()));
            }
            return subscriptionPosition;
        }, th -> {
            return !this.shutdown;
        }, this.retryStrategy).get();
    }

    public void delete(String str) {
        RetryExecution.executeWithRetry(() -> {
            this.mongoOperations.remove(Query.query(Criteria.where("_id").is(str)), this.subscriptionPositionCollection);
        }, th -> {
            return !this.shutdown;
        }, this.retryStrategy).run();
    }

    public boolean exists(String str) {
        return ((Boolean) RetryExecution.executeWithRetry(() -> {
            return Boolean.valueOf(this.mongoOperations.exists(Query.query(Criteria.where("_id").is(str)), this.subscriptionPositionCollection));
        }, th -> {
            return !this.shutdown;
        }, this.retryStrategy).get()).booleanValue();
    }

    private void persistResumeTokenStreamPosition(String str, BsonValue bsonValue) {
        persistDocumentStreamPosition(str, MongoCommons.generateResumeTokenStreamPositionDocument(str, bsonValue));
    }

    private void persistOperationTimeStreamPosition(String str, BsonTimestamp bsonTimestamp) {
        persistDocumentStreamPosition(str, MongoCommons.generateOperationTimeStreamPositionDocument(str, bsonTimestamp));
    }

    void persistDocumentStreamPosition(String str, Document document) {
        this.mongoOperations.upsert(Query.query(Criteria.where("_id").is(str)), Update.fromDocument(document, new String[0]), this.subscriptionPositionCollection);
    }

    @PreDestroy
    void shutdown() {
        this.shutdown = true;
    }
}
