package io.leoplatform.sdk.aws.kinesis;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.config.ConnectorConfig;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/leoplatform/sdk/aws/kinesis/KinesisProducerWriter.class */
public final class KinesisProducerWriter {
    private static final Logger log = LoggerFactory.getLogger(KinesisProducerWriter.class);
    private final KinesisResults resultsProcessor;
    private final KinesisProducer kinesis;
    private final String stream;
    private final ExecutorManager executorManager;
    private final List<CompletableFuture<Void>> pendingWrites = new LinkedList();
    private final Lock lock = new ReentrantLock();
    private final Condition asyncUpload = this.lock.newCondition();

    @Inject
    public KinesisProducerWriter(ConnectorConfig connectorConfig, ExecutorManager executorManager, KinesisResults kinesisResults) {
        this.stream = connectorConfig.value("Stream.Name");
        this.kinesis = new KinesisProducer(new KinesisProducerConfiguration().setCredentialsProvider(credentials(connectorConfig)).setRegion(connectorConfig.valueOrElse("Region", "us-east-1")).setAggregationEnabled(false).setRecordMaxBufferedTime(200L).setRequestTimeout(60000L).setMaxConnections(48L).setCredentialsRefreshDelay(100L).setMetricsNamespace("LEO Java SDK").setLogLevel("info"));
        this.executorManager = executorManager;
        this.resultsProcessor = kinesisResults;
    }

    public void write(ByteBuffer byteBuffer) {
        this.lock.lock();
        try {
            Executor executor = this.executorManager.get();
            this.pendingWrites.add(CompletableFuture.supplyAsync(() -> {
                return addRecord(byteBuffer);
            }, executor).thenAcceptAsync(userRecordResult -> {
                this.resultsProcessor.add(userRecordResult, byteBuffer.array().length);
            }, executor).thenRunAsync(this::removeCompleted, executor));
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() {
        this.kinesis.flushSync();
    }

    private UserRecordResult addRecord(ByteBuffer byteBuffer) {
        try {
            return (UserRecordResult) this.kinesis.addUserRecord(this.stream, "0", byteBuffer).get();
        } catch (Exception e) {
            this.resultsProcessor.addFailure(e);
            throw new RuntimeException("Error adding record");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamStats end() {
        completePendingTasks();
        try {
            log.info("Flushing Kinesis pipeline");
            this.kinesis.flushSync();
        } catch (Exception e) {
            log.warn("Unable to flush kinesis pipeline: {}", e.getMessage());
        }
        try {
            log.info("Stopping Kinesis writer ({} outstanding)", Integer.valueOf(this.kinesis.getOutstandingRecordsCount()));
            this.kinesis.destroy();
            log.info("Stopped Kinesis writer");
        } catch (Exception e2) {
            log.warn("Unable to stop Kinesis writer: {}", e2.getMessage());
        }
        return getStats();
    }

    private void completePendingTasks() {
        while (!this.pendingWrites.isEmpty()) {
            this.lock.lock();
            try {
                this.asyncUpload.await(100L, TimeUnit.MILLISECONDS);
                this.lock.unlock();
            } catch (InterruptedException e) {
                log.warn("Stopped Kinesis upload manager with incomplete pending tasks");
                this.pendingWrites.clear();
            } finally {
            }
            removeCompleted();
        }
        this.lock.lock();
        try {
            long count = this.pendingWrites.parallelStream().map((v0) -> {
                return v0.join();
            }).count();
            if (count > 0) {
                log.info("Waited for {} Kinesis upload{} to complete", Long.valueOf(count), count == 1 ? "" : "s");
            }
        } finally {
        }
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf((v0) -> {
                return v0.isDone();
            });
            this.asyncUpload.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private AWSCredentialsProvider credentials(ConnectorConfig connectorConfig) {
        try {
            Optional filter = Optional.of(connectorConfig.valueOrElse("AwsProfile", "")).map((v0) -> {
                return v0.trim();
            }).filter(str -> {
                return !str.isEmpty();
            }).map(ProfileCredentialsProvider::new).filter(profileCredentialsProvider -> {
                return profileCredentialsProvider.getCredentials() != null;
            });
            Class<AWSCredentialsProvider> cls = AWSCredentialsProvider.class;
            AWSCredentialsProvider.class.getClass();
            return (AWSCredentialsProvider) filter.map((v1) -> {
                return r1.cast(v1);
            }).orElse(DefaultAWSCredentialsProviderChain.getInstance());
        } catch (Exception e) {
            return DefaultAWSCredentialsProviderChain.getInstance();
        }
    }

    private StreamStats getStats() {
        return new StreamStats() { // from class: io.leoplatform.sdk.aws.kinesis.KinesisProducerWriter.1
            public Long successes() {
                return KinesisProducerWriter.this.resultsProcessor.successes();
            }

            public Long failures() {
                return KinesisProducerWriter.this.resultsProcessor.failures();
            }

            public Duration totalTime() {
                return Duration.between(KinesisProducerWriter.this.resultsProcessor.start(), Instant.now());
            }
        };
    }
}
