/*
 * Decompiled with CFR 0.152.
 */
package io.codemonastery.dropwizard.kinesis.consumer;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import com.google.common.base.Preconditions;
import io.codemonastery.dropwizard.kinesis.EventDecoder;
import io.codemonastery.dropwizard.kinesis.consumer.EventConsumer;
import io.codemonastery.dropwizard.kinesis.consumer.RecordProcessorMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RecordProcessor<E>
implements IRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(RecordProcessor.class);
    private final EventDecoder<E> decoder;
    private final EventConsumer<E> processor;
    private final RecordProcessorMetrics metrics;
    private String shardId = null;

    public RecordProcessor(EventDecoder<E> decoder, EventConsumer<E> eventConsumer, RecordProcessorMetrics metrics) {
        Preconditions.checkNotNull(decoder, (Object)"decoder cannot be null");
        Preconditions.checkNotNull(eventConsumer, (Object)"eventConsumer cannot be null");
        Preconditions.checkNotNull((Object)metrics, (Object)"metrics cannot be null");
        this.decoder = decoder;
        this.processor = eventConsumer;
        this.metrics = metrics;
    }

    public void initialize(InitializationInput initializationInput) {
        if (initializationInput != null) {
            this.shardId = initializationInput.getShardId();
        }
        this.metrics.processorStarted();
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        if (processRecordsInput.getMillisBehindLatest() != null) {
            this.metrics.millisBehindLatest(this.shardId, processRecordsInput.getMillisBehindLatest());
        }
        Record lastRecordProcessed = null;
        for (Record record : processRecordsInput.getRecords()) {
            E event;
            try {
                event = this.decoder.decode(record.getData());
                this.metrics.decoded();
            }
            catch (Exception e) {
                this.metrics.decodeFailure();
                LOG.error("Unexpected exception decoding event", (Throwable)e);
                break;
            }
            if (event == null) {
                lastRecordProcessed = record;
                continue;
            }
            boolean processed = false;
            try (AutoCloseable ignored = this.metrics.processTime();){
                processed = this.processor.consume(event);
            }
            catch (Exception e) {
                this.metrics.unhandledException();
                LOG.error("Unhandled exception processing event" + event, (Throwable)e);
            }
            if (processed) {
                this.metrics.processSuccess();
                lastRecordProcessed = record;
                continue;
            }
            this.metrics.processFailure();
            break;
        }
        if (lastRecordProcessed != null) {
            try (AutoCloseable ignore = this.metrics.checkpointTime();){
                processRecordsInput.getCheckpointer().checkpoint(lastRecordProcessed);
            }
            catch (ShutdownException e) {
                this.metrics.checkpointFailed();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Abandoning checkpoint because processor was shutdown");
                }
            }
            catch (Exception e) {
                this.metrics.checkpointFailed();
                LOG.error("Could not checkpoint because of unexpected exception", (Throwable)e);
            }
        }
    }

    public void shutdown(ShutdownInput shutdownInput) {
        this.shardId = "UNKNOWN";
        this.metrics.processorShutdown(this.shardId);
        try {
            shutdownInput.getCheckpointer().checkpoint();
        }
        catch (Exception e) {
            LOG.error("Error check-pointing for shutdown:", (Throwable)e);
        }
    }
}

