/*
 * Decompiled with CFR 0.152.
 */
package io.codemonastery.dropwizard.kinesis.consumer;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import io.codemonastery.dropwizard.kinesis.EventDecoder;
import io.codemonastery.dropwizard.kinesis.consumer.BatchConsumer;
import io.codemonastery.dropwizard.kinesis.consumer.BatchProcessorMetrics;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchProcessor<E>
implements IRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(BatchProcessor.class);
    private final EventDecoder<E> decoder;
    private final BatchConsumer<E> processor;
    private final BatchProcessorMetrics metrics;
    private String shardId;

    public BatchProcessor(EventDecoder<E> decoder, BatchConsumer<E> processor, BatchProcessorMetrics metrics) {
        this.decoder = decoder;
        this.processor = processor;
        this.metrics = metrics;
    }

    public void initialize(InitializationInput initializationInput) {
        this.metrics.processorStarted();
        if (initializationInput != null) {
            this.shardId = initializationInput.getShardId();
        }
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        if (processRecordsInput.getMillisBehindLatest() != null) {
            this.metrics.millisBehindLatest(this.shardId, processRecordsInput.getMillisBehindLatest());
        }
        boolean processed = false;
        List<E> batch = this.decodeBatch(processRecordsInput.getRecords());
        if (batch != null) {
            try {
                processed = this.processor.consume(batch);
            }
            catch (Exception e) {
                this.metrics.unhandledException();
                LOG.error("Unhandled exception processing batch: " + batch, (Throwable)e);
            }
            if (processed) {
                this.metrics.processSuccess(batch.size());
                try (AutoCloseable ignore = this.metrics.checkpointTime();){
                    processRecordsInput.getCheckpointer().checkpoint();
                }
                catch (ShutdownException e) {
                    this.metrics.checkpointFailed();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Abandoning checkpoint because processor was shutdown");
                    }
                }
                catch (Exception e) {
                    this.metrics.checkpointFailed();
                    LOG.error("Could not checkpoint because of unexpected exception", (Throwable)e);
                }
            }
        }
        if (!processed) {
            this.metrics.processFailure(processRecordsInput.getRecords().size());
        }
    }

    public void shutdown(ShutdownInput shutdownInput) {
        this.metrics.processorShutdown(this.shardId);
        try {
            shutdownInput.getCheckpointer().checkpoint();
        }
        catch (Exception e) {
            LOG.error("Error check-pointing for shutdown:", (Throwable)e);
        }
    }

    private List<E> decodeBatch(List<Record> records) {
        ArrayList<E> batch = new ArrayList<E>();
        for (Record record : records) {
            try {
                E event = this.decoder.decode(record.getData());
                this.metrics.decoded();
                if (event == null) {
                    LOG.warn("Decoder returned null, omitting from batch to be consumed");
                    continue;
                }
                batch.add(event);
            }
            catch (Exception e) {
                this.metrics.decodeFailure();
                LOG.error("Unexpected exception decoding event", (Throwable)e);
                batch = null;
                break;
            }
        }
        return batch;
    }
}

