/*
 * Decompiled with CFR 0.152.
 */
package io.elastic.sailor;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import io.elastic.sailor.AMQPWrapperInterface;
import io.elastic.sailor.CipherWrapper;
import io.elastic.sailor.MessageConsumer;
import io.elastic.sailor.MessageProcessor;
import java.io.IOException;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class AMQPWrapper
implements AMQPWrapperInterface {
    private static final Logger logger = LoggerFactory.getLogger(AMQPWrapper.class);
    private Connection amqp;
    private Channel subscribeChannel;
    private Channel publishChannel;
    private String amqpUri;
    private String subscribeExchangeName;
    private String publishExchangeName;
    private String dataRoutingKey;
    private String errorRoutingKey;
    private String reboundRoutingKey;
    private String snapshotRoutingKey;
    private CipherWrapper cipher;
    private MessageProcessor messageProcessor;

    @Inject
    public AMQPWrapper(CipherWrapper cipher) {
        this.cipher = cipher;
    }

    @Inject
    public void setAmqpUri(@Named(value="AMQP_URI") String amqpUri) {
        this.amqpUri = amqpUri;
    }

    @Inject
    public void setSubscribeExchangeName(@Named(value="LISTEN_MESSAGES_ON") String subscribeExchangeName) {
        this.subscribeExchangeName = subscribeExchangeName;
    }

    @Inject
    public void setPublishExchangeName(@Named(value="PUBLISH_MESSAGES_TO") String publishExchangeName) {
        this.publishExchangeName = publishExchangeName;
    }

    @Inject
    public void setDataRoutingKey(@Named(value="DATA_ROUTING_KEY") String dataRoutingKey) {
        this.dataRoutingKey = dataRoutingKey;
    }

    @Inject
    public void setErrorRoutingKey(@Named(value="ERROR_ROUTING_KEY") String errorRoutingKey) {
        this.errorRoutingKey = errorRoutingKey;
    }

    @Inject
    public void setReboundRoutingKey(@Named(value="REBOUND_ROUTING_KEY") String reboundRoutingKey) {
        this.reboundRoutingKey = reboundRoutingKey;
    }

    @Inject
    public void setSnapshotRoutingKey(@Named(value="SNAPSHOT_ROUTING_KEY") String snapshotRoutingKey) {
        this.snapshotRoutingKey = snapshotRoutingKey;
    }

    @Inject
    public void setMessageProcessor(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @Override
    public void connect() {
        this.openConnection(this.amqpUri);
        this.openPublishChannel();
        this.openSubscribeChannel();
    }

    public void disconnect() {
        logger.info("About to disconnect from AMQP");
        try {
            this.subscribeChannel.close();
        }
        catch (IOException e) {
            logger.info("Subscription channel is already closed: " + e);
        }
        try {
            this.publishChannel.close();
        }
        catch (IOException e) {
            logger.info("Publish channel is already closed: " + e);
        }
        try {
            this.amqp.close();
        }
        catch (IOException e) {
            logger.info("AMQP connection is already closed: " + e);
        }
        logger.info("Successfully disconnected from AMQP");
    }

    @Override
    public void subscribeConsumer() {
        MessageConsumer consumer = new MessageConsumer(this.subscribeChannel, this.cipher, this.messageProcessor);
        try {
            this.subscribeChannel.basicConsume(this.subscribeExchangeName, (Consumer)consumer);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        logger.info("Subscribed consumer. Waiting for messages to arrive ...");
    }

    @Override
    public void ack(Long deliveryTag) {
        try {
            logger.info(String.format("Message #%s ack", deliveryTag));
            this.subscribeChannel.basicAck(deliveryTag.longValue(), false);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void reject(Long deliveryTag) {
        try {
            logger.info(String.format("Message #%s reject", deliveryTag));
            this.subscribeChannel.basicReject(deliveryTag.longValue(), false);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void sendData(byte[] payload, AMQP.BasicProperties options) {
        this.sendToExchange(this.dataRoutingKey, payload, options);
    }

    @Override
    public void sendSnapshot(byte[] payload, AMQP.BasicProperties options) {
        this.sendToExchange(this.snapshotRoutingKey, payload, options);
    }

    @Override
    public void sendError(byte[] payload, AMQP.BasicProperties options) {
        this.sendToExchange(this.errorRoutingKey, payload, options);
    }

    @Override
    public void sendRebound(byte[] payload, AMQP.BasicProperties options) {
        this.sendToExchange(this.reboundRoutingKey, payload, options);
    }

    private AMQPWrapper openConnection(String uri) {
        try {
            if (this.amqp == null) {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setUri(new URI(uri));
                this.amqp = factory.newConnection();
                logger.info("Connected to AMQP");
            }
            return this;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private AMQPWrapper openPublishChannel() {
        try {
            if (this.publishChannel == null) {
                this.publishChannel = this.amqp.createChannel();
                logger.info("Opened publish channel");
            }
            return this;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private AMQPWrapper openSubscribeChannel() {
        try {
            if (this.subscribeChannel == null) {
                this.subscribeChannel = this.amqp.createChannel();
                logger.info("Opened subscribe channel");
            }
            return this;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void sendToExchange(String routingKey, byte[] payload, AMQP.BasicProperties options) {
        logger.info("Pushing to exchange={}, routingKey={}", (Object)this.publishExchangeName, (Object)routingKey);
        logger.info("Message headers: {}", (Object)options.getHeaders());
        try {
            this.publishChannel.basicPublish(this.publishExchangeName, routingKey, options, payload);
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Failed to publish message to exchange %s", this.publishExchangeName), e);
        }
        logger.info("Successfully published data to {}", (Object)this.publishExchangeName);
    }

    protected void finalize() throws Throwable {
        this.disconnect();
        super.finalize();
    }

    public void setSubscribeChannel(Channel subscribeChannel) {
        this.subscribeChannel = subscribeChannel;
    }

    public void setPublishChannel(Channel publishChannel) {
        this.publishChannel = publishChannel;
    }
}

