package io.leoplatform.sdk.aws.kinesis;

import io.leoplatform.sdk.AsyncWorkQueue;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.TransferStyle;
import io.leoplatform.sdk.aws.payload.CompressionWriter;
import io.leoplatform.sdk.config.ConnectorConfig;
import io.leoplatform.sdk.payload.EventPayload;
import io.leoplatform.sdk.payload.FileSegment;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/leoplatform/sdk/aws/kinesis/KinesisQueue.class */
public final class KinesisQueue implements AsyncWorkQueue {
    private static final Logger log = LoggerFactory.getLogger(KinesisQueue.class);
    private final long maxBatchAge;
    private final int maxBatchRecords;
    private final long maxBatchSize;
    private final ExecutorManager executorManager;
    private final CompressionWriter compression;
    private final KinesisProducerWriter writer;
    private final BlockingQueue<EventPayload> payloads = new LinkedBlockingQueue();
    private final List<CompletableFuture<Void>> pendingWrites = new LinkedList();
    private final Lock lock = new ReentrantLock();
    private final Condition batchSend = this.lock.newCondition();
    private final AtomicBoolean running = new AtomicBoolean(true);

    @Inject
    public KinesisQueue(ConnectorConfig connectorConfig, ExecutorManager executorManager, CompressionWriter compressionWriter, KinesisProducerWriter kinesisProducerWriter) {
        this.maxBatchAge = connectorConfig.longValueOrElse("Stream.MaxBatchAge", 400L).longValue();
        this.maxBatchRecords = connectorConfig.intValueOrElse("Stream.MaxBatchRecords", 1000).intValue();
        this.maxBatchSize = connectorConfig.longValueOrElse("Stream.MaxBatchSize", 1048576L).longValue();
        this.executorManager = executorManager;
        this.compression = compressionWriter;
        this.writer = kinesisProducerWriter;
        this.pendingWrites.add(CompletableFuture.runAsync(this::asyncBatchSend, executorManager.get()));
    }

    public void addEntity(EventPayload eventPayload) {
        if (!this.running.get()) {
            log.warn("Attempt to add kinesis payload to a stopped queue");
            return;
        }
        add(eventPayload);
        if (exceedsMaxRecords()) {
            signalBatch();
        }
    }

    public void flush() {
        this.writer.flush();
    }

    public StreamStats end() {
        this.running.set(false);
        signalBatch();
        sendAll();
        completePendingTasks();
        return this.writer.end();
    }

    private void add(EventPayload eventPayload) {
        this.lock.lock();
        try {
            this.payloads.add(eventPayload);
        } finally {
            this.lock.unlock();
        }
    }

    private void asyncBatchSend() {
        while (this.running.get()) {
            this.lock.lock();
            try {
                this.batchSend.await(this.maxBatchAge, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                this.running.set(false);
                log.info("Kinesis queue stopped with {} pending", Integer.valueOf(this.payloads.size()));
            } finally {
                this.lock.unlock();
            }
            sendBatch();
        }
    }

    private void sendBatch() {
        do {
            send();
        } while (exceedsMaxRecords());
    }

    private void sendAll() {
        while (!this.payloads.isEmpty()) {
            send();
        }
    }

    private void completePendingTasks() {
        removeCompleted();
        this.lock.lock();
        try {
            long count = this.pendingWrites.parallelStream().filter(completableFuture -> {
                return !completableFuture.isDone();
            }).count();
            log.info("Waiting for {} kinesis background task{} to complete", Long.valueOf(count), count == 1 ? "" : "s");
            while (!this.pendingWrites.isEmpty()) {
                this.lock.lock();
                try {
                    try {
                        this.batchSend.await(100L, TimeUnit.MILLISECONDS);
                        this.lock.unlock();
                    } catch (InterruptedException e) {
                        log.warn("Stopped with incomplete pending Kinesis batch tasks");
                        this.pendingWrites.clear();
                        this.lock.unlock();
                    }
                    removeCompleted();
                } catch (Throwable th) {
                    throw th;
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void send() {
        Set<EventPayload> drainToSet = drainToSet();
        Executor executor = this.executorManager.get();
        CompletableFuture<Void> thenRunAsync = CompletableFuture.supplyAsync(() -> {
            return compressPayloads(drainToSet);
        }, executor).thenAcceptAsync(this::toKinesis, executor).thenRunAsync(this::removeCompleted, executor);
        this.lock.lock();
        try {
            this.pendingWrites.add(thenRunAsync);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private Queue<FileSegment> compressPayloads(Set<EventPayload> set) {
        FileSegment compressWithOffsets = this.compression.compressWithOffsets(set);
        if (compressWithOffsets.getOffset().getGzipSize().longValue() <= this.maxBatchSize) {
            return (Queue) Stream.of(compressWithOffsets).collect(Collectors.toCollection(LinkedList::new));
        }
        log.warn("Compressed payload batch exceeds {} bytes", compressWithOffsets.getOffset().getGzipSize());
        Stream<R> map = set.parallelStream().map((v0) -> {
            return Collections.singletonList(v0);
        });
        CompressionWriter compressionWriter = this.compression;
        compressionWriter.getClass();
        return (Queue) map.map((v1) -> {
            return r1.compressWithOffsets(v1);
        }).peek(this::checkSize).filter(fileSegment -> {
            return fileSegment.getOffset().getGzipSize().longValue() <= this.maxBatchSize;
        }).collect(Collectors.toCollection(LinkedList::new));
    }

    private void checkSize(FileSegment fileSegment) {
        Long gzipSize = fileSegment.getOffset().getGzipSize();
        if (gzipSize.longValue() > this.maxBatchSize) {
            log.error("Skipping {} byte payload which exceeds maximum of {} bytes", gzipSize, Long.valueOf(this.maxBatchSize));
        }
    }

    private void toKinesis(Queue<FileSegment> queue) {
        Stream map = queue.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getSegment();
        }).filter(bArr -> {
            return bArr.length > 0;
        }).map(ByteBuffer::wrap);
        KinesisProducerWriter kinesisProducerWriter = this.writer;
        kinesisProducerWriter.getClass();
        map.forEachOrdered(kinesisProducerWriter::write);
    }

    private Set<EventPayload> drainToSet() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        this.lock.lock();
        try {
            this.payloads.drainTo(linkedHashSet, this.maxBatchRecords);
            return linkedHashSet;
        } finally {
            this.lock.unlock();
        }
    }

    private void signalBatch() {
        this.lock.lock();
        try {
            this.batchSend.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf((v0) -> {
                return v0.isDone();
            });
        } finally {
            this.lock.unlock();
        }
    }

    public TransferStyle style() {
        return TransferStyle.STREAM;
    }

    private boolean exceedsMaxRecords() {
        return this.payloads.size() >= this.maxBatchRecords;
    }
}
