/*
 * Decompiled with CFR 0.152.
 */
package io.buybrain.hamq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.buybrain.hamq.BackendChannel;
import io.buybrain.hamq.BindSpec;
import io.buybrain.hamq.Connection;
import io.buybrain.hamq.ConsumeSpec;
import io.buybrain.hamq.Delivery;
import io.buybrain.hamq.ExchangeSpec;
import io.buybrain.hamq.OperationSpec;
import io.buybrain.hamq.PrefetchSpec;
import io.buybrain.hamq.PublishSpec;
import io.buybrain.hamq.QueueSpec;
import io.buybrain.hamq.RetryPolicy;
import io.buybrain.hamq.Retryer;
import io.buybrain.util.Result;
import io.buybrain.util.function.ThrowingConsumer;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Channel {
    private static final Logger log = LoggerFactory.getLogger(Channel.class);
    @NonNull
    private final Connection connection;
    @NonNull
    private final Retryer retryer;
    private BackendChannel channel;
    private static final AtomicInteger tagCounter = new AtomicInteger();
    private final List<ExchangeSpec> exchanges = new ArrayList<ExchangeSpec>();
    private final List<QueueSpec> queues = new ArrayList<QueueSpec>();
    private final List<BindSpec> binds = new ArrayList<BindSpec>();
    private final Map<String, ConsumeSpec> consumers = new HashMap<String, ConsumeSpec>();
    private PrefetchSpec prefetchSpec;
    private final Lock getChannelLock = new ReentrantLock();

    public synchronized void exchangeDeclare(@NonNull ExchangeSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.doExchangeDeclare(spec);
        this.exchanges.add(spec);
    }

    private void doExchangeDeclare(@NonNull ExchangeSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.exchangeDeclare(spec.getName(), spec.getType(), spec.isDurable(), spec.isAutoDelete(), spec.isInternal(), spec.getArgs())), spec);
    }

    public synchronized void queueDeclare(@NonNull QueueSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.doQueueDeclare(spec);
        this.queues.add(spec);
    }

    private void doQueueDeclare(@NonNull QueueSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.queueDeclare(spec.getName(), spec.isDurable(), spec.isExclusive(), spec.isAutoDelete(), spec.getArgs())), spec);
    }

    public synchronized void queueBind(@NonNull BindSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.doQueueBind(spec);
        this.binds.add(spec);
    }

    private void doQueueBind(@NonNull BindSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.queueBind(spec.getQueue(), spec.getExchange(), spec.getRoutingKey(), spec.getArgs())), spec);
    }

    public void prefetch(@NonNull PrefetchSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.doPrefetch(spec);
        this.prefetchSpec = spec;
    }

    private void doPrefetch(PrefetchSpec spec) {
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.basicQos(spec.getAmount())), spec);
    }

    public void publish(@NonNull PublishSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        AMQP.BasicProperties.Builder propBuilder = new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(spec.isDurable() ? 2 : 1));
        if (!spec.getHeaders().isEmpty()) {
            propBuilder.headers(spec.getHeaders());
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.basicPublish(spec.getExchange(), spec.getRoutingKey(), spec.isMandatory(), propBuilder.build(), spec.getBody())), spec);
    }

    public void consume(@NonNull ConsumeSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        String consumerTag = "consumer-" + tagCounter.getAndIncrement();
        this.doConsume(consumerTag, spec);
        this.consumers.put(consumerTag, spec);
    }

    private void doConsume(@NonNull String consumerTag, @NonNull ConsumeSpec spec) {
        if (consumerTag == null) {
            throw new NullPointerException("consumerTag");
        }
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        final AtomicBoolean closed = new AtomicBoolean(false);
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.basicConsume(spec.getQueue(), consumerTag, spec.isNoLocal(), spec.isExclusive(), spec.getArgs(), new Consumer((BackendChannel)chan, spec){
            final /* synthetic */ BackendChannel val$chan;
            final /* synthetic */ ConsumeSpec val$spec;
            {
                this.val$chan = backendChannel;
                this.val$spec = consumeSpec;
            }

            public void handleConsumeOk(String consumerTag) {
            }

            public void handleCancelOk(String consumerTag) {
            }

            public void handleCancel(String consumerTag) throws IOException {
            }

            public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                closed.set(true);
                Channel.this.reset(this.val$chan);
            }

            public void handleRecoverOk(String consumerTag) {
            }

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                if (closed.get()) {
                    throw new RuntimeException("Consumer is closed");
                }
                Result.trying(() -> this.val$spec.getCallback().accept((Object)new Delivery(this.val$chan, envelope, properties, body))).orElse(ex -> {
                    Result.trying(() -> this.val$chan.basicCancel(consumerTag));
                    log.warn("Error while (n)acking delivery, will retry consuming", ex);
                    closed.set(true);
                    Channel.this.reset(this.val$chan);
                });
            }
        })), spec);
    }

    private void perform(ThrowingConsumer<BackendChannel> operation, OperationSpec spec) {
        this.retryer.performWithRetry(() -> operation.accept((Object)this.activeChannel()), this.getRetryPolicy(spec).withErrorHandler(ex -> {
            if (Retryer.shouldReconnectToRecover(ex)) {
                this.reset(null);
            }
        }));
    }

    private RetryPolicy getRetryPolicy(OperationSpec spec) {
        if (spec.getRetryPolicy() != null) {
            return spec.getRetryPolicy();
        }
        return this.connection.getRetryPolicy();
    }

    private BackendChannel activeChannel() {
        this.getChannelLock.lock();
        if (this.channel == null) {
            this.channel = (BackendChannel)this.retryer.performWithRetry(() -> this.connection.activeConnection().newChannel(), new RetryPolicy().withRetryAll(true));
        }
        this.getChannelLock.unlock();
        return this.channel;
    }

    private void reset(BackendChannel errorChan) {
        if (this.channel != null && errorChan != null && this.channel != errorChan) {
            return;
        }
        if (this.channel != null) {
            this.consumers.keySet().forEach(tag -> Result.trying(() -> this.channel.basicCancel((String)tag)));
            Result.trying(this.channel::close);
        }
        this.connection.reset();
        this.channel = null;
        this.exchanges.forEach(this::doExchangeDeclare);
        this.queues.forEach(this::doQueueDeclare);
        this.binds.forEach(this::doQueueBind);
        if (this.prefetchSpec != null) {
            this.doPrefetch(this.prefetchSpec);
        }
        this.consumers.forEach(this::doConsume);
    }

    @ConstructorProperties(value={"connection", "retryer"})
    public Channel(@NonNull Connection connection, @NonNull Retryer retryer) {
        if (connection == null) {
            throw new NullPointerException("connection");
        }
        if (retryer == null) {
            throw new NullPointerException("retryer");
        }
        this.connection = connection;
        this.retryer = retryer;
    }
}

