/*
 * Decompiled with CFR 0.152.
 */
package io.codemonastery.dropwizard.kinesis.producer.ratelimit;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import io.codemonastery.dropwizard.kinesis.producer.PutterMetrics;
import io.codemonastery.dropwizard.kinesis.producer.RecordPutter;
import io.codemonastery.dropwizard.kinesis.producer.ratelimit.AcquireLimiter;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RateLimitedRecordPutter
implements RecordPutter {
    private static final Logger LOG = LoggerFactory.getLogger(RateLimitedRecordPutter.class);
    private final AmazonKinesis kinesis;
    private final PutterMetrics metrics;
    private final AcquireLimiter limiter;

    public RateLimitedRecordPutter(AmazonKinesis kinesis, PutterMetrics metrics, AcquireLimiter limiter) {
        this.kinesis = kinesis;
        this.metrics = metrics;
        this.limiter = limiter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int send(PutRecordsRequest request) throws Exception {
        int failedCount = 0;
        try (Closeable ignored = this.metrics.time();){
            boolean success = false;
            while (!success) {
                int numRecordsSent = request.getRecords().size();
                int numRecordsRateExceeded = 0;
                try {
                    this.limiter.acquire(request.getRecords().size());
                    PutRecordsResult result = this.kinesis.putRecords(request);
                    int requestFailedCount = Optional.ofNullable(result.getFailedRecordCount()).orElse(0);
                    if (requestFailedCount == 0) {
                        success = true;
                    } else {
                        ArrayList newRecordsFromBackoff = new ArrayList(result.getRecords().size());
                        List oldRecords = request.getRecords();
                        for (int i = 0; i < result.getRecords().size(); ++i) {
                            PutRecordsResultEntry recordResult = (PutRecordsResultEntry)result.getRecords().get(i);
                            if (!"ProvisionedThroughputExceededException".equals(recordResult.getErrorCode())) continue;
                            newRecordsFromBackoff.add(oldRecords.get(i));
                        }
                        if (newRecordsFromBackoff.isEmpty()) {
                            success = true;
                        } else {
                            numRecordsRateExceeded = newRecordsFromBackoff.size();
                            request.setRecords(newRecordsFromBackoff);
                        }
                        failedCount += requestFailedCount - numRecordsRateExceeded;
                    }
                    this.limiter.update(numRecordsSent, numRecordsRateExceeded);
                }
                catch (ProvisionedThroughputExceededException e) {
                    if (LOG.isDebugEnabled()) {
                        String message = String.format("Exceeded rate limit for stream \"%s\", backing off", request.getStreamName());
                        LOG.debug(message, (Throwable)e);
                    }
                    this.limiter.update(request.getRecords().size(), request.getRecords().size());
                }
            }
        }
        finally {
            this.metrics.sent(request.getRecords().size() - failedCount, failedCount);
        }
        return failedCount;
    }
}

