package io.leoplatform.sdk.aws.s3;

import io.leoplatform.sdk.AsyncWorkQueue;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.TransferStyle;
import io.leoplatform.sdk.aws.payload.CompressionWriter;
import io.leoplatform.sdk.payload.EventPayload;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/leoplatform/sdk/aws/s3/S3Queue.class */
public final class S3Queue implements AsyncWorkQueue {
    private static final Logger log = LoggerFactory.getLogger(S3Queue.class);
    private final CompressionWriter compression;
    private final ExecutorManager executorManager;
    private final S3Writer s3Writer;
    private final int maxBatchRecords = 1000;
    private final BlockingQueue<EventPayload> payloads = new LinkedBlockingQueue();
    private final List<CompletableFuture<Void>> pendingWrites = new LinkedList();
    private final Lock lock = new ReentrantLock();
    private final Condition batchSend = this.lock.newCondition();
    private final AtomicBoolean running = new AtomicBoolean(true);

    @Inject
    public S3Queue(ExecutorManager executorManager, CompressionWriter compressionWriter, S3Writer s3Writer) {
        this.compression = compressionWriter;
        this.executorManager = executorManager;
        this.s3Writer = s3Writer;
        CompletableFuture.runAsync(this::asyncBatchSend, executorManager.get());
    }

    public void addEntity(EventPayload eventPayload) {
        if (!this.running.get()) {
            log.warn("Attempt to add S3 payload to a stopped queue");
            return;
        }
        add(eventPayload);
        if (exceedsMaxRecords()) {
            signalBatch();
        }
    }

    public void flush() {
        signalBatch();
        sendAll();
        completePendingWrites();
        this.s3Writer.flush();
    }

    private void add(EventPayload eventPayload) {
        this.lock.lock();
        try {
            this.payloads.add(eventPayload);
        } finally {
            this.lock.unlock();
        }
    }

    private void asyncBatchSend() {
        while (this.running.get()) {
            this.lock.lock();
            try {
                this.batchSend.await(200L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                this.running.set(false);
                log.info("S3 queue stopped with {} pending", Integer.valueOf(this.payloads.size()));
            } finally {
                this.lock.unlock();
            }
            sendBatch();
        }
    }

    public StreamStats end() {
        flush();
        this.running.set(false);
        signalBatch();
        return this.s3Writer.end();
    }

    private void send() {
        Set<EventPayload> drainToSet = drainToSet();
        if (drainToSet.isEmpty()) {
            return;
        }
        this.lock.lock();
        try {
            Executor executor = this.executorManager.get();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
                return this.compression.compressWithOffsets(drainToSet);
            }, executor);
            S3Writer s3Writer = this.s3Writer;
            s3Writer.getClass();
            this.pendingWrites.add(supplyAsync.thenAcceptAsync(s3Writer::write, executor).thenRunAsync(this::removeCompleted, executor));
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private void sendBatch() {
        do {
            send();
        } while (exceedsMaxRecords());
    }

    private void sendAll() {
        while (!this.payloads.isEmpty()) {
            send();
        }
    }

    private void completePendingWrites() {
        removeCompleted();
        this.lock.lock();
        try {
            long count = this.pendingWrites.parallelStream().filter(completableFuture -> {
                return !completableFuture.isDone();
            }).count();
            if (count > 0) {
                log.info("Waiting for {} S3 background task{} to stop", Long.valueOf(count), count == 1 ? "" : "s");
            }
            while (!this.pendingWrites.isEmpty()) {
                this.lock.lock();
                try {
                    try {
                        this.batchSend.await(100L, TimeUnit.MILLISECONDS);
                        this.lock.unlock();
                    } catch (Throwable th) {
                        throw th;
                    }
                } catch (InterruptedException e) {
                    log.warn("Stopped with incomplete pending S3 batch tasks");
                    this.pendingWrites.clear();
                    this.lock.unlock();
                }
                removeCompleted();
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf((v0) -> {
                return v0.isDone();
            });
        } finally {
            this.lock.unlock();
        }
    }

    private void signalBatch() {
        this.lock.lock();
        try {
            this.batchSend.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private Set<EventPayload> drainToSet() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        this.lock.lock();
        try {
            this.payloads.drainTo(linkedHashSet, 1000);
            return linkedHashSet;
        } finally {
            this.lock.unlock();
        }
    }

    private boolean exceedsMaxRecords() {
        return this.payloads.size() >= 1000;
    }

    public TransferStyle style() {
        return TransferStyle.STORAGE;
    }
}
