/*
 * Decompiled with CFR 0.152.
 */
package io.interact.sqsdw;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.interact.sqsdw.MessageHandler;
import io.interact.sqsdw.SqsListener;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqsListenerImpl
implements SqsListener {
    private static final int SLEEP_ON_ERROR = 5000;
    private static final Logger LOG = LoggerFactory.getLogger(SqsListenerImpl.class);
    private static final String ATTR_ALL = "All";
    private final AtomicBoolean healthy = new AtomicBoolean(true);
    private final AmazonSQS sqs;
    private final String sqsListenQueueUrl;
    private final Set<MessageHandler> handlers;
    private final String interruptedMsg;
    private Thread pollingThread;

    @Inject
    public SqsListenerImpl(AmazonSQS sqs, @Named(value="sqsListenQueueUrl") String sqsListenQueueUrl, Set<MessageHandler> handlers) {
        this.sqs = sqs;
        this.sqsListenQueueUrl = sqsListenQueueUrl;
        this.handlers = handlers;
        this.interruptedMsg = "Stop listening to queue: " + sqsListenQueueUrl;
    }

    public void start() throws Exception {
        this.pollingThread = new Thread(){

            @Override
            public void run() {
                LOG.info("Start listening to queue: " + SqsListenerImpl.this.sqsListenQueueUrl);
                while (!this.isInterrupted()) {
                    try {
                        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(SqsListenerImpl.this.sqsListenQueueUrl).withMessageAttributeNames(new String[]{SqsListenerImpl.ATTR_ALL});
                        List messages = SqsListenerImpl.this.sqs.receiveMessage(receiveMessageRequest).getMessages();
                        for (int i = 0; i < messages.size(); ++i) {
                            Message msg = (Message)messages.get(i);
                            LOG.debug(String.format("Processing message %s of %s...", i + 1, messages.size()));
                            try {
                                for (MessageHandler handler : SqsListenerImpl.this.handlers) {
                                    LOG.debug("Calling message handler: " + handler);
                                    if (handler.canHandle(msg)) {
                                        LOG.debug("Message accepted.");
                                        handler.handle(msg);
                                        continue;
                                    }
                                    LOG.debug("Message refused.");
                                }
                            }
                            catch (Exception e) {
                                SqsListenerImpl.this.logProcessingError(msg, e);
                            }
                            String messageRecieptHandle = msg.getReceiptHandle();
                            SqsListenerImpl.this.sqs.deleteMessage(new DeleteMessageRequest(SqsListenerImpl.this.sqsListenQueueUrl, messageRecieptHandle));
                            LOG.debug(String.format("Message %s of %s is processed and deleted from queue '%s'", i + 1, messages.size(), SqsListenerImpl.this.sqsListenQueueUrl));
                        }
                        boolean recovered = SqsListenerImpl.this.healthy.compareAndSet(false, true);
                        if (!recovered) continue;
                        LOG.info(String.format("Queue '%s' recovered from error condition", SqsListenerImpl.this.sqsListenQueueUrl));
                    }
                    catch (Exception e) {
                        SqsListenerImpl.this.handleQueueError(e);
                    }
                }
                LOG.info(SqsListenerImpl.this.interruptedMsg);
            }
        };
        this.pollingThread.start();
    }

    private void logProcessingError(Message msg, Exception e) {
        StringBuilder builder = new StringBuilder().append("An error occurred while processing the following message:").append("\n\tMessageId:     ").append(msg.getMessageId()).append("\n\tReceiptHandle: ").append(msg.getReceiptHandle()).append("\n\tMD5OfBody:     ").append(msg.getMD5OfBody()).append("\n\tBody:          ").append(msg.getBody());
        for (Map.Entry entry : msg.getMessageAttributes().entrySet()) {
            builder.append("\n\tAttribute\n\t\tName:  " + (String)entry.getKey() + "\n\t\tValue: " + entry.getValue());
        }
        LOG.error(builder.toString(), (Throwable)e);
    }

    private void handleQueueError(Exception e) {
        boolean firstAttempt = this.healthy.compareAndSet(true, false);
        String errorMsg = "An error occurred while listening to '%s', waiting '%s' ms before retrying...";
        if (!firstAttempt) {
            errorMsg = "Retry failed while listening to '%s', waiting '%s' ms before retrying...";
        }
        LOG.error(String.format(errorMsg, this.sqsListenQueueUrl, 5000), (Throwable)e);
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException ie) {
            LOG.info(this.interruptedMsg);
            return;
        }
    }

    public void stop() throws Exception {
        this.pollingThread.interrupt();
    }

    @Override
    public boolean isHealthy() {
        return this.healthy.get();
    }

    @Override
    public String getQueueUrl() {
        return this.sqsListenQueueUrl;
    }
}

