/*
 * Decompiled with CFR 0.152.
 */
package io.codemonastery.dropwizard.kinesis.producer;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.codemonastery.dropwizard.kinesis.EventEncoder;
import io.codemonastery.dropwizard.kinesis.producer.BufferedProducerMetrics;
import io.codemonastery.dropwizard.kinesis.producer.Producer;
import io.codemonastery.dropwizard.kinesis.producer.PutRecordsBuffer;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BufferedProducer<E>
extends Producer<E> {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedProducer.class);
    private final AmazonKinesis kinesis;
    private final String streamName;
    private final ExecutorService deliveryExecutor;
    private final BufferedProducerMetrics bufferedMetrics;
    private final PutRecordsBuffer buffer;

    public BufferedProducer(AmazonKinesis kinesis, String streamName, Function<E, String> partitionKeyFn, EventEncoder<E> encoder, int maxBufferSize, ScheduledExecutorService deliveryExecutor, BufferedProducerMetrics metrics) {
        super(partitionKeyFn, encoder, metrics);
        Preconditions.checkNotNull((Object)kinesis, (Object)"kinesis cannot be null");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)streamName) ? 1 : 0) != 0, (Object)"must have a stream name");
        Preconditions.checkArgument((maxBufferSize > 0 ? 1 : 0) != 0, (Object)"maxBufferSize must be positive");
        Preconditions.checkNotNull((Object)deliveryExecutor, (Object)"must have a delivery executor");
        this.kinesis = kinesis;
        this.streamName = streamName;
        this.deliveryExecutor = deliveryExecutor;
        this.bufferedMetrics = metrics;
        this.buffer = new PutRecordsBuffer(maxBufferSize);
    }

    public void flush() {
        try {
            List<PutRecordsRequestEntry> submitMe = this.buffer.drain();
            this.bufferedMetrics.bufferRemove(submitMe.size());
            if (!submitMe.isEmpty()) {
                List<PutRecordsRequestEntry> temp = submitMe;
                this.deliveryExecutor.submit(() -> this.putRecords(temp));
            }
        }
        catch (Exception e) {
            LOG.error("unexpected error while flushing", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        PutRecordsBuffer putRecordsBuffer = this.buffer;
        synchronized (putRecordsBuffer) {
            super.stop();
            this.putRecords(this.buffer.drain());
        }
    }

    @Override
    protected void send(PutRecordsRequestEntry record) {
        List<PutRecordsRequestEntry> submitMe = this.buffer.add(record);
        if (submitMe != null) {
            this.bufferedMetrics.bufferRemove(submitMe.size());
        }
        this.bufferedMetrics.bufferPut(1);
        if (submitMe != null) {
            List<PutRecordsRequestEntry> temp = submitMe;
            this.deliveryExecutor.submit(() -> this.putRecords(temp));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putRecords(List<PutRecordsRequestEntry> records) {
        int failedCount = records.size();
        try (Closeable ignored = this.metrics.time();){
            PutRecordsResult result = this.kinesis.putRecords(new PutRecordsRequest().withRecords(records).withStreamName(this.streamName));
            failedCount = result.getFailedRecordCount();
            if (LOG.isDebugEnabled()) {
                String message = String.format("Put %d records to stream %s, %d failed", result.getRecords().size(), this.streamName, failedCount);
                LOG.debug(message);
            }
        }
        catch (Exception e) {
            LOG.error("Unexpected error while putting records", (Throwable)e);
        }
        finally {
            this.metrics.sent(records.size() - failedCount, failedCount);
        }
    }
}

