/*
 * Decompiled with CFR 0.152.
 */
package io.elastic.sailor.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.elastic.api.Function;
import io.elastic.api.Message;
import io.elastic.sailor.ContainerContext;
import io.elastic.sailor.ExecutionContext;
import io.elastic.sailor.ExecutionStats;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.MessageResolver;
import io.elastic.sailor.Sailor;
import io.elastic.sailor.Step;
import io.elastic.sailor.Utils;
import io.elastic.sailor.impl.CryptoServiceImpl;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class MessageConsumer
extends DefaultConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    private final CryptoServiceImpl cipher;
    private final MessageProcessor processor;
    private final Function function;
    private final Step step;
    private final ContainerContext containerContext;
    private final MessageResolver messageResolver;
    private final Channel channel;
    private final ExecutorService threadPool;

    public MessageConsumer(Channel channel, CryptoServiceImpl cipher, MessageProcessor processor, Function function, Step step, ContainerContext containerContext, MessageResolver messageResolver, ExecutorService threadPool) {
        super(channel);
        this.channel = channel;
        this.cipher = cipher;
        this.processor = processor;
        this.function = function;
        this.step = step;
        this.containerContext = containerContext;
        this.messageResolver = messageResolver;
        this.threadPool = threadPool;
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        this.threadPool.submit(() -> {
            if (Sailor.gracefulShutdownHandler != null) {
                Sailor.gracefulShutdownHandler.increment();
            }
            ExecutionContext executionContext = null;
            long deliveryTag = envelope.getDeliveryTag();
            this.putIntoMDC(properties);
            try {
                executionContext = this.createExecutionContext(body, properties);
            }
            catch (Exception e) {
                try {
                    this.channel.basicReject(deliveryTag, false);
                }
                catch (IOException ioException) {
                    logger.error("Failed to basicReject message: {}", (Object)Utils.getStackTrace(e));
                }
                logger.error("Failed to parse or resolve message to process {}", (Object)Utils.getStackTrace(e));
                this.decrement();
                return;
            }
            ExecutionStats stats = null;
            try {
                stats = this.processor.processMessage(executionContext, this.function);
            }
            catch (Exception e) {
                logger.error("Failed to process message: {}", (Object)Utils.getStackTrace(e));
            }
            finally {
                MessageConsumer.removeFromMDC("threadId");
                MessageConsumer.removeFromMDC("messageId");
                MessageConsumer.removeFromMDC("parentMessageId");
                try {
                    this.ackOrReject(stats, deliveryTag);
                }
                catch (IOException e) {
                    logger.error("Failed to ackOrReject message: {}", (Object)Utils.getStackTrace(e));
                }
                this.decrement();
            }
        });
    }

    private void decrement() {
        try {
            if (Sailor.gracefulShutdownHandler != null) {
                Sailor.gracefulShutdownHandler.decrement();
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    private void putIntoMDC(AMQP.BasicProperties properties) {
        String threadId = Utils.getThreadId(properties);
        Object messageId = this.getHeaderValue(properties, "messageId");
        Object parentMessageId = this.getHeaderValue(properties, "parentMessageId");
        MDC.put((String)"threadId", (String)threadId);
        MDC.put((String)"messageId", (String)messageId.toString());
        MDC.put((String)"parentMessageId", (String)parentMessageId.toString());
        logger.info("messageId={}, parentMessageId={}, threadId={}", new Object[]{messageId, parentMessageId, threadId});
    }

    private static void removeFromMDC(String key) {
        try {
            MDC.remove((String)key);
        }
        catch (Exception e) {
            logger.warn("Failed to remove {} from MDC: {}", (Object)key, (Object)Utils.getStackTrace(e));
        }
    }

    private ExecutionContext createExecutionContext(byte[] body, AMQP.BasicProperties properties) {
        Message message = this.messageResolver.materialize(body, properties);
        return new ExecutionContext(this.step, body, message, properties, this.containerContext);
    }

    private void ackOrReject(ExecutionStats stats, long deliveryTag) throws IOException {
        logger.info("Execution stats: {}", (Object)stats);
        if (stats == null || stats.getErrorCount() > 0) {
            logger.info("Reject received messages {}", (Object)deliveryTag);
            this.getChannel().basicReject(deliveryTag, false);
            return;
        }
        logger.info("Acknowledging received message with deliveryTag={}", (Object)deliveryTag);
        this.getChannel().basicAck(deliveryTag, false);
    }

    private Object getHeaderValue(AMQP.BasicProperties properties, String headerName) {
        return properties.getHeaders().getOrDefault(headerName, "unknown");
    }

    public void handleConsumeOk(String consumerTag) {
        logger.debug("Consumer {} is registered", (Object)consumerTag);
    }
}

