/*
 * Decompiled with CFR 0.152.
 */
package io.elastic.sailor.impl;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import io.elastic.api.Function;
import io.elastic.sailor.AmqpService;
import io.elastic.sailor.ContainerContext;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.MessageResolver;
import io.elastic.sailor.Step;
import io.elastic.sailor.impl.CryptoServiceImpl;
import io.elastic.sailor.impl.MessageConsumer;
import java.io.IOException;
import java.net.URI;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class AmqpServiceImpl
implements AmqpService {
    private static final Logger logger = LoggerFactory.getLogger(AmqpServiceImpl.class);
    private Connection amqp;
    private Channel subscribeChannel;
    private String amqpUri;
    private String subscribeExchangeName;
    private Integer prefetchCount;
    private CryptoServiceImpl cipher;
    private MessageProcessor messageProcessor;
    private Step step;
    private String consumerTag;
    private ContainerContext containerContext;
    private MessageResolver messageResolver;
    private ThreadPoolExecutor threadPoolExecutor;
    private Integer threadPoolSize;

    @Inject
    public AmqpServiceImpl(CryptoServiceImpl cipher) {
        this.cipher = cipher;
    }

    @Inject
    public void setAmqpUri(@Named(value="ELASTICIO_AMQP_URI") String amqpUri) {
        this.amqpUri = amqpUri;
    }

    @Inject
    public void setSubscribeExchangeName(@Named(value="ELASTICIO_LISTEN_MESSAGES_ON") String subscribeExchangeName) {
        this.subscribeExchangeName = subscribeExchangeName;
    }

    @Inject
    public void setMessageProcessor(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @Inject
    public void setPrefetchCount(@Named(value="ELASTICIO_RABBITMQ_PREFETCH_SAILOR") Integer prefetchCount) {
        this.prefetchCount = prefetchCount;
    }

    @Inject
    public void setStep(@Named(value="StepJson") Step step) {
        this.step = step;
    }

    @Inject
    public void setContainerContext(ContainerContext containerContext) {
        this.containerContext = containerContext;
    }

    @Inject
    public void setMessageResolver(MessageResolver messageResolver) {
        this.messageResolver = messageResolver;
    }

    @Inject(optional=true)
    public void setThreadPoolSize(@Named(value="ELASTICIO_CONSUMER_THREAD_POOL_SIZE") Integer threadPoolSize) {
        this.threadPoolSize = threadPoolSize;
    }

    @Override
    public void connectAndSubscribe() {
        this.openConnection();
        this.openSubscribeChannel();
    }

    @Override
    public void disconnect() {
        logger.info("About to disconnect from AMQP");
        try {
            this.subscribeChannel.close();
        }
        catch (IOException | TimeoutException e) {
            logger.info("Subscription channel is already closed: " + e);
        }
        try {
            this.amqp.close();
        }
        catch (IOException e) {
            logger.info("AMQP connection is already closed: " + e);
        }
        this.threadPoolExecutor.shutdown();
        logger.info("Successfully disconnected from AMQP");
    }

    @Override
    public void subscribeConsumer(Function function) {
        MessageConsumer consumer = new MessageConsumer(this.subscribeChannel, this.cipher, this.messageProcessor, function, this.step, this.containerContext, this.messageResolver, this.threadPoolExecutor);
        try {
            this.consumerTag = this.subscribeChannel.basicConsume(this.subscribeExchangeName, (Consumer)consumer);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        logger.info("Subscribed consumer {}. Waiting for messages to arrive ...", (Object)this.consumerTag);
    }

    @Override
    public void cancelConsumer() {
        if (this.consumerTag != null) {
            logger.info("Canceling consumer {}", (Object)this.consumerTag);
            try {
                this.subscribeChannel.basicCancel(this.consumerTag);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        this.consumerTag = null;
    }

    @Override
    public void ack(Long deliveryTag) {
        try {
            logger.info(String.format("Message #%s ack", deliveryTag));
            this.subscribeChannel.basicAck(deliveryTag.longValue(), false);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void reject(Long deliveryTag) {
        try {
            logger.info(String.format("Message #%s reject", deliveryTag));
            this.subscribeChannel.basicReject(deliveryTag.longValue(), false);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private AmqpServiceImpl openConnection() {
        Integer threadPoolSize = Optional.ofNullable(this.threadPoolSize).orElse(this.prefetchCount);
        this.threadPoolExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        try {
            if (this.amqp == null) {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setUri(new URI(this.amqpUri));
                this.amqp = factory.newConnection();
                logger.info("Connected to AMQP with thread pool of {} threads", (Object)threadPoolSize);
            }
            return this;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private AmqpServiceImpl openSubscribeChannel() {
        try {
            if (this.subscribeChannel == null) {
                this.subscribeChannel = this.amqp.createChannel();
                this.subscribeChannel.basicQos(this.prefetchCount.intValue());
                logger.info("Opened subscribe channel");
            }
            return this;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void setSubscribeChannel(Channel subscribeChannel) {
        this.subscribeChannel = subscribeChannel;
    }

    @Override
    public Connection getConnection() {
        return this.amqp;
    }
}

