package org.axonframework.eventsourcing.eventstore.mongo;

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.Assert;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.eventsourcing.eventstore.legacy.LegacyTrackingToken;
import org.axonframework.eventsourcing.eventstore.mongo.documentperevent.EventEntryConfiguration;
import org.axonframework.serialization.Serializer;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/mongo/AbstractMongoEventStorageStrategy.class */
public abstract class AbstractMongoEventStorageStrategy implements StorageStrategy {
    private static final long DEFAULT_GAP_DETECTION_INTERVAL = 10000;
    protected static final int ORDER_ASC = 1;
    protected static final int ORDER_DESC = -1;
    private final EventEntryConfiguration eventConfiguration;
    private final long gapDetectionInterval;

    public AbstractMongoEventStorageStrategy(EventEntryConfiguration eventEntryConfiguration) {
        this(eventEntryConfiguration, DEFAULT_GAP_DETECTION_INTERVAL);
    }

    public AbstractMongoEventStorageStrategy(EventEntryConfiguration eventEntryConfiguration, long j) {
        this.eventConfiguration = eventEntryConfiguration;
        this.gapDetectionInterval = j;
    }

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public void appendEvents(DBCollection dBCollection, List<? extends EventMessage<?>> list, Serializer serializer) throws MongoException.DuplicateKey {
        dBCollection.insert((List) createEventDocuments(list, serializer).collect(Collectors.toList()));
    }

    protected abstract Stream<DBObject> createEventDocuments(List<? extends EventMessage<?>> list, Serializer serializer);

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public void appendSnapshot(DBCollection dBCollection, DomainEventMessage<?> domainEventMessage, Serializer serializer) throws MongoException.DuplicateKey {
        dBCollection.insert(new DBObject[]{createSnapshotDocument(domainEventMessage, serializer)});
    }

    protected abstract DBObject createSnapshotDocument(DomainEventMessage<?> domainEventMessage, Serializer serializer);

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public void deleteSnapshots(DBCollection dBCollection, String str) {
        DBCursor find = dBCollection.find(BasicDBObjectBuilder.start().add(this.eventConfiguration.aggregateIdentifierProperty(), str).get());
        dBCollection.getClass();
        find.forEach(dBCollection::remove);
    }

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public List<? extends DomainEventData<?>> findDomainEvents(DBCollection dBCollection, String str, long j, int i) {
        DBCursor applyBatchSize = applyBatchSize(dBCollection.find(BasicDBObjectBuilder.start().add(eventConfiguration().aggregateIdentifierProperty(), str).add(eventConfiguration().sequenceNumberProperty(), new BasicDBObject("$gte", Long.valueOf(j))).get()).sort(new BasicDBObject(eventConfiguration().sequenceNumberProperty(), Integer.valueOf(ORDER_ASC))), i);
        try {
            List<? extends DomainEventData<?>> list = (List) StreamSupport.stream(applyBatchSize.spliterator(), false).flatMap(this::extractDomainEvents).filter(domainEventData -> {
                return domainEventData.getSequenceNumber() >= j;
            }).limit(i).collect(Collectors.toList());
            applyBatchSize.close();
            return list;
        } catch (Throwable th) {
            applyBatchSize.close();
            throw th;
        }
    }

    protected abstract Stream<? extends DomainEventData<?>> extractDomainEvents(DBObject dBObject);

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public List<? extends TrackedEventData<?>> findTrackedEvents(DBCollection dBCollection, TrackingToken trackingToken, int i) {
        DBCursor find;
        if (trackingToken == null) {
            find = dBCollection.find(BasicDBObjectBuilder.start().get());
        } else {
            Assert.isTrue(trackingToken instanceof LegacyTrackingToken, String.format("Token %s is of the wrong type", trackingToken));
            LegacyTrackingToken legacyTrackingToken = (LegacyTrackingToken) trackingToken;
            find = dBCollection.find(BasicDBObjectBuilder.start().add(this.eventConfiguration.timestampProperty(), new BasicDBObject("$gte", legacyTrackingToken.getTimestamp().toString())).add(this.eventConfiguration.sequenceNumberProperty(), new BasicDBObject("$gte", Long.valueOf(legacyTrackingToken.getSequenceNumber()))).get());
        }
        DBCursor applyBatchSize = applyBatchSize(find.sort(new BasicDBObject(eventConfiguration().timestampProperty(), Integer.valueOf(ORDER_ASC)).append(eventConfiguration().sequenceNumberProperty(), Integer.valueOf(ORDER_ASC))), i);
        try {
            List<? extends TrackedEventData<?>> list = (List) StreamSupport.stream(applyBatchSize.spliterator(), false).flatMap(this::extractTrackedEvents).filter(trackedEventData -> {
                return trackedEventData.trackingToken().isAfter(trackingToken);
            }).limit(i).collect(Collectors.toList());
            applyBatchSize.close();
            return list;
        } catch (Throwable th) {
            applyBatchSize.close();
            throw th;
        }
    }

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public TrackingToken getTokenForGapDetection(TrackingToken trackingToken) {
        if (trackingToken == null) {
            return null;
        }
        Assert.isTrue(trackingToken instanceof LegacyTrackingToken, String.format("Token %s is of the wrong type", trackingToken));
        LegacyTrackingToken legacyTrackingToken = (LegacyTrackingToken) trackingToken;
        return new LegacyTrackingToken(legacyTrackingToken.getTimestamp(), legacyTrackingToken.getAggregateIdentifier(), legacyTrackingToken.getSequenceNumber());
    }

    protected abstract DBCursor applyBatchSize(DBCursor dBCursor, int i);

    protected abstract Stream<? extends TrackedEventData<?>> extractTrackedEvents(DBObject dBObject);

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public Optional<? extends DomainEventData<?>> findLastSnapshot(DBCollection dBCollection, String str) {
        DBCursor limit = dBCollection.find(BasicDBObjectBuilder.start().add(this.eventConfiguration.aggregateIdentifierProperty(), str).get()).sort(new BasicDBObject(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_DESC))).limit(ORDER_ASC);
        Throwable th = null;
        try {
            try {
                Optional<? extends DomainEventData<?>> map = StreamSupport.stream(limit.spliterator(), false).findFirst().map(this::extractSnapshot);
                if (limit != null) {
                    if (0 != 0) {
                        try {
                            limit.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        limit.close();
                    }
                }
                return map;
            } finally {
            }
        } catch (Throwable th3) {
            if (limit != null) {
                if (th != null) {
                    try {
                        limit.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    limit.close();
                }
            }
            throw th3;
        }
    }

    protected abstract DomainEventData<?> extractSnapshot(DBObject dBObject);

    @Override // org.axonframework.eventsourcing.eventstore.mongo.StorageStrategy
    public void ensureIndexes(DBCollection dBCollection, DBCollection dBCollection2) {
        dBCollection.ensureIndex(new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), "uniqueAggregateIndex", true);
        dBCollection.ensureIndex(new BasicDBObject(this.eventConfiguration.timestampProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), "orderedEventStreamIndex", false);
        dBCollection2.ensureIndex(new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), "uniqueAggregateIndex", true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventEntryConfiguration eventConfiguration() {
        return this.eventConfiguration;
    }
}
