/*
 * Decompiled with CFR 0.152.
 */
package io.elastic.sailor.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.elastic.api.Function;
import io.elastic.api.Message;
import io.elastic.sailor.ContainerContext;
import io.elastic.sailor.ExecutionContext;
import io.elastic.sailor.ExecutionStats;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.Step;
import io.elastic.sailor.Utils;
import io.elastic.sailor.impl.CryptoServiceImpl;
import java.io.IOException;
import java.nio.charset.Charset;
import javax.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class MessageConsumer
extends DefaultConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    private final CryptoServiceImpl cipher;
    private final MessageProcessor processor;
    private final Function function;
    private final Step step;
    private final ContainerContext containerContext;

    public MessageConsumer(Channel channel, CryptoServiceImpl cipher, MessageProcessor processor, Function function, Step step, ContainerContext containerContext) {
        super(channel);
        this.cipher = cipher;
        this.processor = processor;
        this.function = function;
        this.step = step;
        this.containerContext = containerContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        ExecutionContext executionContext = null;
        long deliveryTag = envelope.getDeliveryTag();
        logger.info("Consumer {} received message: deliveryTag={}", (Object)consumerTag, (Object)deliveryTag);
        this.putIntoMDC(properties);
        try {
            executionContext = this.createExecutionContext(body, properties);
        }
        catch (Exception e) {
            logger.info("Failed to parse message to process {}", (Object)deliveryTag, (Object)e);
            this.getChannel().basicReject(deliveryTag, false);
            return;
        }
        ExecutionStats stats = null;
        try {
            stats = this.processor.processMessage(executionContext, this.function);
        }
        catch (Exception e) {
            logger.error("Failed to process message for delivery tag:" + deliveryTag, (Throwable)e);
        }
        finally {
            MessageConsumer.removeFromMDC("threadId");
            MessageConsumer.removeFromMDC("messageId");
            MessageConsumer.removeFromMDC("parentMessageId");
            this.ackOrReject(stats, deliveryTag);
        }
    }

    private void putIntoMDC(AMQP.BasicProperties properties) {
        String threadId = Utils.getThreadId(properties);
        Object messageId = this.getHeaderValue(properties, "messageId");
        Object parentMessageId = this.getHeaderValue(properties, "parentMessageId");
        MDC.put((String)"threadId", (String)threadId);
        MDC.put((String)"messageId", (String)messageId.toString());
        MDC.put((String)"parentMessageId", (String)parentMessageId.toString());
        logger.info("messageId={}, parentMessageId={}, threadId={}", new Object[]{messageId, parentMessageId, threadId});
    }

    private static void removeFromMDC(String key) {
        try {
            MDC.remove((String)key);
        }
        catch (Exception e) {
            logger.warn("Failed to remove {} from MDC", (Object)key, (Object)e);
        }
    }

    private ExecutionContext createExecutionContext(byte[] body, AMQP.BasicProperties properties) {
        String bodyString = new String(body, Charset.forName("UTF-8"));
        JsonObject payload = this.cipher.decryptMessageContent(bodyString);
        Message message = Utils.createMessage(payload);
        return new ExecutionContext(this.step, message, properties, this.containerContext.getContainerId());
    }

    private void ackOrReject(ExecutionStats stats, long deliveryTag) throws IOException {
        logger.info("Execution stats: {}", (Object)stats);
        if (stats == null || stats.getErrorCount() > 0) {
            logger.info("Reject received messages {}", (Object)deliveryTag);
            this.getChannel().basicReject(deliveryTag, false);
            return;
        }
        logger.info("Acknowledging received messages {}", (Object)deliveryTag);
        this.getChannel().basicAck(deliveryTag, true);
    }

    private Object getHeaderValue(AMQP.BasicProperties properties, String headerName) {
        return properties.getHeaders().getOrDefault(headerName, "unknown");
    }
}

