/*
 * Decompiled with CFR 0.152.
 */
package io.elastic.sailor.impl;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import io.elastic.api.Function;
import io.elastic.api.Message;
import io.elastic.sailor.AmqpService;
import io.elastic.sailor.ContainerContext;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.Step;
import io.elastic.sailor.impl.CryptoServiceImpl;
import io.elastic.sailor.impl.MessageConsumer;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class AmqpServiceImpl
implements AmqpService {
    private static final Logger logger = LoggerFactory.getLogger(AmqpServiceImpl.class);
    private Connection amqp;
    private Channel subscribeChannel;
    private Channel publishChannel;
    private String amqpUri;
    private String subscribeExchangeName;
    private String publishExchangeName;
    private String dataRoutingKey;
    private String errorRoutingKey;
    private String reboundRoutingKey;
    private String snapshotRoutingKey;
    private Integer prefetchCount;
    private CryptoServiceImpl cipher;
    private MessageProcessor messageProcessor;
    private Step step;
    private String consumerTag;
    private ContainerContext containerContext;

    @Inject
    public AmqpServiceImpl(CryptoServiceImpl cipher) {
        this.cipher = cipher;
    }

    @Inject
    public void setAmqpUri(@Named(value="ELASTICIO_AMQP_URI") String amqpUri) {
        this.amqpUri = amqpUri;
    }

    @Inject
    public void setSubscribeExchangeName(@Named(value="ELASTICIO_LISTEN_MESSAGES_ON") String subscribeExchangeName) {
        this.subscribeExchangeName = subscribeExchangeName;
    }

    @Inject
    public void setPublishExchangeName(@Named(value="ELASTICIO_PUBLISH_MESSAGES_TO") String publishExchangeName) {
        this.publishExchangeName = publishExchangeName;
    }

    @Inject
    public void setDataRoutingKey(@Named(value="ELASTICIO_DATA_ROUTING_KEY") String dataRoutingKey) {
        this.dataRoutingKey = dataRoutingKey;
    }

    @Inject
    public void setErrorRoutingKey(@Named(value="ELASTICIO_ERROR_ROUTING_KEY") String errorRoutingKey) {
        this.errorRoutingKey = errorRoutingKey;
    }

    @Inject
    public void setReboundRoutingKey(@Named(value="ELASTICIO_REBOUND_ROUTING_KEY") String reboundRoutingKey) {
        this.reboundRoutingKey = reboundRoutingKey;
    }

    @Inject
    public void setSnapshotRoutingKey(@Named(value="ELASTICIO_SNAPSHOT_ROUTING_KEY") String snapshotRoutingKey) {
        this.snapshotRoutingKey = snapshotRoutingKey;
    }

    @Inject
    public void setMessageProcessor(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @Inject
    public void setPrefetchCount(@Named(value="ELASTICIO_RABBITMQ_PREFETCH_SAILOR") Integer prefetchCount) {
        this.prefetchCount = prefetchCount;
    }

    @Inject
    public void setStep(@Named(value="StepJson") Step step) {
        this.step = step;
    }

    @Inject
    public void setContainerContext(ContainerContext containerContext) {
        this.containerContext = containerContext;
    }

    @Override
    public void connect() {
        this.openConnection(this.amqpUri);
        this.openPublishChannel();
        this.openSubscribeChannel();
    }

    @Override
    public void disconnect() {
        logger.info("About to disconnect from AMQP");
        try {
            this.subscribeChannel.close();
        }
        catch (IOException | TimeoutException e) {
            logger.info("Subscription channel is already closed: " + e);
        }
        try {
            this.publishChannel.close();
        }
        catch (IOException | TimeoutException e) {
            logger.info("Publish channel is already closed: " + e);
        }
        try {
            this.amqp.close();
        }
        catch (IOException e) {
            logger.info("AMQP connection is already closed: " + e);
        }
        logger.info("Successfully disconnected from AMQP");
    }

    @Override
    public void subscribeConsumer(Function function) {
        MessageConsumer consumer = new MessageConsumer(this.subscribeChannel, this.cipher, this.messageProcessor, function, this.step, this.containerContext);
        try {
            this.consumerTag = this.subscribeChannel.basicConsume(this.subscribeExchangeName, (Consumer)consumer);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        logger.info("Subscribed consumer {}. Waiting for messages to arrive ...", (Object)this.consumerTag);
    }

    @Override
    public void cancelConsumer() {
        if (this.consumerTag != null) {
            logger.info("Canceling consumer {}", (Object)this.consumerTag);
            try {
                this.subscribeChannel.basicCancel(this.consumerTag);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        this.consumerTag = null;
    }

    @Override
    public void ack(Long deliveryTag) {
        try {
            logger.info(String.format("Message #%s ack", deliveryTag));
            this.subscribeChannel.basicAck(deliveryTag.longValue(), false);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void reject(Long deliveryTag) {
        try {
            logger.info(String.format("Message #%s reject", deliveryTag));
            this.subscribeChannel.basicReject(deliveryTag.longValue(), false);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void sendData(byte[] payload, AMQP.BasicProperties options) {
        this.sendToExchange(this.dataRoutingKey, payload, options);
    }

    @Override
    public void sendHttpReply(byte[] payload, AMQP.BasicProperties options) {
        Map headers = options.getHeaders();
        Object routingKey = headers.get("reply_to");
        if (routingKey == null) {
            throw new RuntimeException("Component emitted 'httpReply' event but 'reply_to' was not found in AMQP headers");
        }
        this.sendToExchange(routingKey.toString(), payload, options);
    }

    @Override
    public void sendSnapshot(byte[] payload, AMQP.BasicProperties options) {
        this.sendToExchange(this.snapshotRoutingKey, payload, options);
    }

    @Override
    public void sendError(Throwable e, AMQP.BasicProperties options, Message originalMessage) {
        StringWriter writer = new StringWriter();
        e.printStackTrace(new PrintWriter(writer));
        JsonObjectBuilder builder = Json.createObjectBuilder().add("name", e.getClass().getName()).add("stack", writer.toString());
        if (e.getMessage() != null) {
            builder.add("message", e.getMessage());
        }
        JsonObject error = builder.build();
        JsonObjectBuilder payloadBuilder = Json.createObjectBuilder().add("error", this.cipher.encryptJsonObject(error));
        if (originalMessage != null) {
            payloadBuilder.add("errorInput", this.cipher.encryptMessage(originalMessage));
        }
        JsonObject payload = payloadBuilder.build();
        byte[] errorPayload = payload.toString().getBytes();
        this.sendToExchange(this.errorRoutingKey, errorPayload, options);
    }

    @Override
    public void sendRebound(byte[] payload, AMQP.BasicProperties options) {
        this.sendToExchange(this.reboundRoutingKey, payload, options);
    }

    private AmqpServiceImpl openConnection(String uri) {
        try {
            if (this.amqp == null) {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setUri(new URI(uri));
                this.amqp = factory.newConnection();
                logger.info("Connected to AMQP");
            }
            return this;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private AmqpServiceImpl openPublishChannel() {
        try {
            if (this.publishChannel == null) {
                this.publishChannel = this.amqp.createChannel();
                logger.info("Opened publish channel");
            }
            return this;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private AmqpServiceImpl openSubscribeChannel() {
        try {
            if (this.subscribeChannel == null) {
                this.subscribeChannel = this.amqp.createChannel();
                this.subscribeChannel.basicQos(this.prefetchCount.intValue());
                logger.info("Opened subscribe channel");
            }
            return this;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void sendToExchange(String routingKey, byte[] payload, AMQP.BasicProperties options) {
        logger.info("Pushing to exchange={}, routingKey={}", (Object)this.publishExchangeName, (Object)routingKey);
        logger.info("Message headers: {}", (Object)options.getHeaders());
        try {
            this.publishChannel.basicPublish(this.publishExchangeName, routingKey, options, payload);
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Failed to publish message to exchange %s", this.publishExchangeName), e);
        }
        logger.info("Successfully published data to {}", (Object)this.publishExchangeName);
    }

    public void setSubscribeChannel(Channel subscribeChannel) {
        this.subscribeChannel = subscribeChannel;
    }

    public void setPublishChannel(Channel publishChannel) {
        this.publishChannel = publishChannel;
    }
}

