package org.occurrent.subscription.mongodb.spring.reactor;

import com.mongodb.client.result.UpdateResult;
import java.util.Objects;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.reactor.SubscriptionPositionStorage;
import org.occurrent.subscription.mongodb.MongoOperationTimeSubscriptionPosition;
import org.occurrent.subscription.mongodb.MongoResumeTokenSubscriptionPosition;
import org.occurrent.subscription.mongodb.internal.MongoCommons;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/occurrent/subscription/mongodb/spring/reactor/ReactorSubscriptionPositionStorage.class */
public class ReactorSubscriptionPositionStorage implements SubscriptionPositionStorage {
    private final ReactiveMongoOperations mongo;
    private final String subscriptionPositionCollection;

    public ReactorSubscriptionPositionStorage(ReactiveMongoOperations reactiveMongoOperations, String str) {
        Objects.requireNonNull(reactiveMongoOperations, ReactiveMongoOperations.class.getSimpleName() + " cannot be null");
        Objects.requireNonNull(str, "subscriptionPositionCollection cannot be null");
        this.mongo = reactiveMongoOperations;
        this.subscriptionPositionCollection = str;
    }

    public Mono<SubscriptionPosition> save(String str, SubscriptionPosition subscriptionPosition) {
        return (subscriptionPosition instanceof MongoResumeTokenSubscriptionPosition ? persistResumeTokenStreamPosition(str, ((MongoResumeTokenSubscriptionPosition) subscriptionPosition).resumeToken) : subscriptionPosition instanceof MongoOperationTimeSubscriptionPosition ? persistOperationTimeStreamPosition(str, ((MongoOperationTimeSubscriptionPosition) subscriptionPosition).operationTime) : persistDocumentStreamPosition(str, MongoCommons.generateGenericSubscriptionPositionDocument(str, subscriptionPosition.asString()))).thenReturn(subscriptionPosition);
    }

    public Mono<Void> delete(String str) {
        return this.mongo.remove(Query.query(Criteria.where("_id").is(str)), this.subscriptionPositionCollection).then();
    }

    private Mono<Document> persistResumeTokenStreamPosition(String str, BsonDocument bsonDocument) {
        Document generateResumeTokenStreamPositionDocument = MongoCommons.generateResumeTokenStreamPositionDocument(str, bsonDocument);
        return persistDocumentStreamPosition(str, generateResumeTokenStreamPositionDocument).thenReturn(generateResumeTokenStreamPositionDocument);
    }

    private Mono<Document> persistOperationTimeStreamPosition(String str, BsonTimestamp bsonTimestamp) {
        Document generateOperationTimeStreamPositionDocument = MongoCommons.generateOperationTimeStreamPositionDocument(str, bsonTimestamp);
        return persistDocumentStreamPosition(str, generateOperationTimeStreamPositionDocument).thenReturn(generateOperationTimeStreamPositionDocument);
    }

    private Mono<UpdateResult> persistDocumentStreamPosition(String str, Document document) {
        return this.mongo.upsert(Query.query(Criteria.where("_id").is(str)), Update.fromDocument(document, new String[0]), this.subscriptionPositionCollection);
    }

    public Mono<SubscriptionPosition> read(String str) {
        return this.mongo.findOne(Query.query(Criteria.where("_id").is(str)), Document.class, this.subscriptionPositionCollection).map(MongoCommons::calculateSubscriptionPositionFromMongoStreamPositionDocument);
    }
}
