/*
 * Decompiled with CFR 0.152.
 */
package io.elastic.sailor.impl;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.elastic.sailor.AmqpService;
import io.elastic.sailor.MessagePublisher;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MessagePublisherImpl
implements MessagePublisher {
    private static final Logger logger = LoggerFactory.getLogger(MessagePublisherImpl.class);
    private static final int WAIT_FOR_CONFIRM_DURATION = 5000;
    private String publishExchangeName;
    private AmqpService amqpService;
    private int publishMaxRetries;
    private long publishRetryDelay;
    private long publishMaxRetryDelay;
    private Channel publishChannel;
    private final boolean isPublishConfirmEnabled;
    private final boolean isPersistentMessagesEnabled;

    @Inject
    public MessagePublisherImpl(@Named(value="ELASTICIO_PUBLISH_MESSAGES_TO") String publishExchangeName, @Named(value="ELASTICIO_AMQP_PUBLISH_RETRY_ATTEMPTS") int publishMaxRetries, @Named(value="ELASTICIO_AMQP_PUBLISH_RETRY_DELAY") long publishRetryDelay, @Named(value="ELASTICIO_AMQP_PUBLISH_MAX_RETRY_DELAY") long publishMaxRetryDelay, @Named(value="ELASTICIO_AMQP_PUBLISH_CONFIRM_ENABLED") boolean isPublishConfirmEnabled, @Named(value="ELASTICIO_AMQP_PERSISTENT_MESSAGES") boolean isPersistentMessagesEnabled, AmqpService amqpService) {
        this.publishExchangeName = publishExchangeName;
        this.publishMaxRetries = publishMaxRetries;
        this.publishRetryDelay = publishRetryDelay;
        this.publishMaxRetryDelay = publishMaxRetryDelay;
        this.amqpService = amqpService;
        this.isPublishConfirmEnabled = isPublishConfirmEnabled;
        this.isPersistentMessagesEnabled = isPersistentMessagesEnabled;
    }

    @Override
    public void publish(String routingKey, byte[] payload, AMQP.BasicProperties options) {
        logger.info("Pushing to exchange={}, routingKey={}", (Object)this.publishExchangeName, (Object)routingKey);
        boolean retryPublish = true;
        int retryCount = 0;
        while (retryPublish) {
            Channel publishChannel = this.getPublishChannel();
            AMQP.BasicProperties.Builder propertiesBuilder = this.getRetryProperties(options, retryCount);
            propertiesBuilder.deliveryMode(Integer.valueOf(this.isPersistentMessagesEnabled ? 2 : 1));
            AMQP.BasicProperties properties = propertiesBuilder.build();
            logger.debug("isPersistentMessagesEnabled={}, isPublishConfirmEnabled={}, properties={}", new Object[]{this.isPersistentMessagesEnabled, this.isPublishConfirmEnabled, properties});
            try {
                logger.trace("Publish options={}, retryCount={}", new Object[]{this.publishExchangeName, routingKey, properties, options, retryCount});
                publishChannel.basicPublish(this.publishExchangeName, routingKey, properties, payload);
            }
            catch (IOException e) {
                throw new RuntimeException(String.format("Failed to publish message to exchange %s", this.publishExchangeName), e);
            }
            if (this.isPublishConfirmEnabled) {
                boolean bl = retryPublish = !this.waitForConfirms(publishChannel);
                if (retryPublish) {
                    this.sleep(retryCount + 1);
                    if (retryCount >= this.publishMaxRetries) {
                        throw new IllegalStateException(String.format("Failed to publish the message to a queue after %s retries. The limit of %s retries reached.", retryCount, this.publishMaxRetries));
                    }
                    ++retryCount;
                }
            } else {
                retryPublish = false;
            }
            logger.info("Successfully published data to {}", (Object)this.publishExchangeName);
        }
    }

    private AMQP.BasicProperties.Builder getRetryProperties(AMQP.BasicProperties properties, int retryCount) {
        if (retryCount < 1) {
            return properties.builder();
        }
        HashMap<String, Integer> retryHeaders = new HashMap<String, Integer>();
        retryHeaders.putAll(properties.getHeaders());
        retryHeaders.put("retry", retryCount);
        return properties.builder().headers(retryHeaders);
    }

    private boolean waitForConfirms(Channel publishChannel) {
        logger.info("Waiting for publish confirm");
        try {
            return publishChannel.waitForConfirms(5000L);
        }
        catch (InterruptedException e) {
            logger.error("Thread was interrupted while waiting for publisher confirmation");
            throw new RuntimeException(e);
        }
        catch (TimeoutException e) {
            throw new RuntimeException("Waiting for publisher confirmation timed out", e);
        }
        catch (IllegalStateException e) {
            logger.info("Looks like publisher confirmation was asked on a non-Confirm channel. Please check if the publisher channel was created with publisher confirms enabled.");
            throw e;
        }
    }

    private void sleep(int currentPublishAttempt) {
        long sleep = this.calculateSleepDuration(currentPublishAttempt);
        logger.info("Published message to {} was not confirmed. Trying again in {} millis.", (Object)this.publishExchangeName, (Object)sleep);
        try {
            Thread.sleep(sleep);
        }
        catch (InterruptedException e) {
            logger.error("Thread was interrupted while sleeping");
            throw new RuntimeException(e);
        }
    }

    private long calculateSleepDuration(int currentPublishAttempt) {
        double sleep = Math.pow(2.0, currentPublishAttempt - 1) * (double)this.publishRetryDelay;
        if (sleep > (double)this.publishMaxRetryDelay) {
            return this.publishMaxRetryDelay;
        }
        return new Double(sleep).longValue();
    }

    private Channel getPublishChannel() {
        if (this.publishChannel == null) {
            this.publishChannel = this.createPublishChannel();
        }
        return this.publishChannel;
    }

    private synchronized Channel createPublishChannel() {
        try {
            if (this.publishChannel == null) {
                Connection connection = this.amqpService.getConnection();
                this.publishChannel = connection.createChannel();
                if (this.isPublishConfirmEnabled) {
                    this.publishChannel.confirmSelect();
                }
                logger.info("Opened publish channel");
            }
            return this.publishChannel;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

