package io.leoplatform.sdk.aws.s3;

import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.config.ConnectorConfig;
import io.leoplatform.sdk.payload.FileSegment;
import io.leoplatform.sdk.payload.StorageEventOffset;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/leoplatform/sdk/aws/s3/S3Writer.class */
public final class S3Writer {
    private static final Logger log = LoggerFactory.getLogger(S3Writer.class);
    private static final DateTimeFormatter eidFormat = DateTimeFormatter.ofPattern("uuuu'/'MM'/'dd'/'HH'/'mm").withZone(ZoneOffset.UTC);
    private final long maxBatchAge;
    private final int maxBatchRecords;
    private final long maxRecordSize;
    private final S3BufferStyle bufferStyle;
    private final S3TransferManager transferManager;
    private final Queue<FileSegment> payloads = new LinkedList();
    private final Lock lock = new ReentrantLock();
    private final Condition batchSend = this.lock.newCondition();
    private final AtomicBoolean running = new AtomicBoolean(true);

    @Inject
    public S3Writer(ConnectorConfig connectorConfig, S3TransferManager s3TransferManager, ExecutorManager executorManager) {
        this.maxBatchAge = connectorConfig.longValueOrElse("Storage.MaxBatchAge", 4000L).longValue();
        this.maxBatchRecords = connectorConfig.intValueOrElse("Storage.MaxBatchRecords", 6000).intValue();
        this.maxRecordSize = connectorConfig.longValueOrElse("Storage.MaxBatchSize", 5017600L).longValue();
        this.bufferStyle = S3BufferStyle.fromName(connectorConfig.valueOrElse("Storage.BufferStyle", "Memory"));
        this.transferManager = s3TransferManager;
        CompletableFuture.runAsync(this::asyncBatchSend, executorManager.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void write(FileSegment fileSegment) {
        if (!this.running.get()) {
            log.warn("Attempt to add file segment to a stopped batch process");
        } else {
            add(fileSegment);
            signalBatch();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() {
        signalBatch();
        sendAll();
        this.transferManager.flush();
    }

    private void add(FileSegment fileSegment) {
        this.lock.lock();
        try {
            this.payloads.add(fileSegment);
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamStats end() {
        log.info("Stopping S3 writer");
        flush();
        this.running.set(false);
        signalBatch();
        log.info("Stopped S3 writer");
        return this.transferManager.end();
    }

    private void signalBatch() {
        this.lock.lock();
        try {
            this.batchSend.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private void asyncBatchSend() {
        Instant now = Instant.now();
        while (this.running.get()) {
            this.lock.lock();
            try {
                this.batchSend.await(this.maxBatchAge, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                this.running.set(false);
                log.info("S3 batch writer stopped with {} pending", Integer.valueOf(this.payloads.size()));
            } finally {
                this.lock.unlock();
            }
            if (Instant.now().isAfter(now.plusMillis(this.maxBatchAge))) {
                send();
                now = Instant.now();
            }
        }
    }

    private void send() {
        AtomicLong atomicLong = new AtomicLong();
        Instant now = Instant.now();
        this.lock.lock();
        while (!this.payloads.isEmpty()) {
            try {
                Queue<FileSegment> drainToMaximum = drainToMaximum();
                Optional.of(drainToMaximum).map((v0) -> {
                    return v0.peek();
                }).map((v0) -> {
                    return v0.getOffset();
                }).ifPresent(storageEventOffset -> {
                    transferAsync(atomicLong.incrementAndGet(), now, drainToMaximum, storageEventOffset);
                });
            } finally {
                this.lock.unlock();
            }
        }
    }

    private void transferAsync(long j, Instant instant, Queue<FileSegment> queue, StorageEventOffset storageEventOffset) {
        String fileName = fileName(storageEventOffset, instant, j);
        if (this.bufferStyle == S3BufferStyle.DISK) {
            this.transferManager.enqueue(new PendingFileUpload(fileName, queue));
        } else {
            this.transferManager.enqueue(new PendingMemoryUpload(fileName, queue));
        }
    }

    private void sendAll() {
        while (!this.payloads.isEmpty()) {
            send();
        }
    }

    private Queue<FileSegment> drainToMaximum() {
        Queue<FileSegment> linkedList = new LinkedList<>();
        do {
            if (!this.payloads.isEmpty()) {
                linkedList.add(this.payloads.remove());
            }
            if (this.payloads.isEmpty()) {
                break;
            }
        } while (belowMax(linkedList, this.payloads.peek()));
        return linkedList;
    }

    private boolean belowMax(Queue<FileSegment> queue, FileSegment fileSegment) {
        return queue.stream().map((v0) -> {
            return v0.getOffset();
        }).mapToLong((v0) -> {
            return v0.getGzipSize();
        }).sum() + fileSegment.getOffset().getGzipSize().longValue() < this.maxRecordSize && queue.stream().map((v0) -> {
            return v0.getOffset();
        }).mapToLong((v0) -> {
            return v0.getRecords();
        }).sum() + fileSegment.getOffset().getRecords().longValue() < ((long) this.maxBatchRecords);
    }

    private String fileName(StorageEventOffset storageEventOffset, Instant instant, long j) {
        return String.format("bus/%s/z/%s/%d-%s.gz", storageEventOffset.getEvent(), eidFormat.format(instant), Long.valueOf(instant.toEpochMilli()), padWithZeros(j));
    }

    private String padWithZeros(long j) {
        String valueOf = String.valueOf(j);
        return String.format("%0" + (7 - valueOf.length()) + "d%s", 0, valueOf);
    }
}
