/*
 * Decompiled with CFR 0.152.
 */
package io.reflectoring.sqs.internal;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.reflectoring.sqs.api.ExceptionHandler;
import io.reflectoring.sqs.api.SqsMessageHandler;
import io.reflectoring.sqs.api.SqsMessagePollerProperties;
import io.reflectoring.sqs.internal.SqsMessageFetcher;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SqsMessagePoller<T> {
    private static final Logger logger = LoggerFactory.getLogger(SqsMessagePoller.class);
    private final String name;
    private final SqsMessageHandler<T> messageHandler;
    private final SqsMessageFetcher messageFetcher;
    private final SqsMessagePollerProperties pollingProperties;
    private final AmazonSQS sqsClient;
    private final ObjectMapper objectMapper;
    private final ScheduledThreadPoolExecutor pollerThreadPool;
    private final ThreadPoolExecutor handlerThreadPool;
    private final ExceptionHandler exceptionHandler;

    void start() {
        logger.info("starting SqsMessagePoller");
        for (int i = 0; i < this.pollerThreadPool.getCorePoolSize(); ++i) {
            logger.info("starting SqsMessagePoller ({}) - thread {}", (Object)this.name, (Object)i);
            this.pollerThreadPool.scheduleWithFixedDelay(this::pollMessages, this.pollingProperties.getPollDelay().getSeconds(), this.pollingProperties.getPollDelay().getSeconds(), TimeUnit.SECONDS);
        }
    }

    void stop() {
        logger.info("stopping SqsMessagePoller");
        this.pollerThreadPool.shutdownNow();
        this.handlerThreadPool.shutdownNow();
    }

    void pollMessages() {
        try {
            List<Message> messages = this.messageFetcher.fetchMessages();
            for (Message sqsMessage : messages) {
                this.handleMessage(sqsMessage);
            }
        }
        catch (Exception e) {
            logger.error("error fetching messages from queue {}:", (Object)this.pollingProperties.getQueueUrl(), (Object)e);
        }
    }

    private void handleMessage(Message sqsMessage) {
        try {
            Object message = this.objectMapper.readValue(sqsMessage.getBody(), this.messageHandler.messageType());
            this.handlerThreadPool.submit(() -> {
                try {
                    this.messageHandler.onBeforeHandle(message);
                    this.messageHandler.handle(message);
                    this.acknowledgeMessage(sqsMessage);
                    logger.debug("message {} processed successfully - message has been deleted from SQS", (Object)sqsMessage.getMessageId());
                    return;
                }
                catch (Exception e) {
                    ExceptionHandler.ExceptionHandlerDecision result = this.exceptionHandler.handleException(sqsMessage, e);
                    switch (result) {
                        case RETRY: {
                            return;
                        }
                        case DELETE: {
                            this.acknowledgeMessage(sqsMessage);
                            return;
                        }
                    }
                    return;
                }
                finally {
                    this.messageHandler.onAfterHandle(message);
                }
            });
        }
        catch (JsonProcessingException e) {
            logger.warn("error parsing message {} - deleting message from SQS because it's not recoverable: ", (Object)sqsMessage.getMessageId(), (Object)e);
        }
    }

    private void acknowledgeMessage(Message message) {
        this.sqsClient.deleteMessage(this.pollingProperties.getQueueUrl(), message.getReceiptHandle());
    }

    @Generated
    public SqsMessagePoller(String name, SqsMessageHandler<T> messageHandler, SqsMessageFetcher messageFetcher, SqsMessagePollerProperties pollingProperties, AmazonSQS sqsClient, ObjectMapper objectMapper, ScheduledThreadPoolExecutor pollerThreadPool, ThreadPoolExecutor handlerThreadPool, ExceptionHandler exceptionHandler) {
        this.name = name;
        this.messageHandler = messageHandler;
        this.messageFetcher = messageFetcher;
        this.pollingProperties = pollingProperties;
        this.sqsClient = sqsClient;
        this.objectMapper = objectMapper;
        this.pollerThreadPool = pollerThreadPool;
        this.handlerThreadPool = handlerThreadPool;
        this.exceptionHandler = exceptionHandler;
    }
}

