/*
 * Decompiled with CFR 0.152.
 */
package io.buybrain.hamq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.buybrain.hamq.BackendChannel;
import io.buybrain.hamq.Connection;
import io.buybrain.hamq.ConsumeSpec;
import io.buybrain.hamq.Delivery;
import io.buybrain.hamq.ExchangeSpec;
import io.buybrain.hamq.OperationSpec;
import io.buybrain.hamq.PrefetchSpec;
import io.buybrain.hamq.PublishSpec;
import io.buybrain.hamq.QueueSpec;
import io.buybrain.hamq.RetryPolicy;
import io.buybrain.hamq.Retryer;
import io.buybrain.util.Result;
import io.buybrain.util.function.ThrowingConsumer;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Channel {
    private static final Logger log = LoggerFactory.getLogger(Channel.class);
    @NonNull
    private final Connection connection;
    @NonNull
    private final Retryer retryer;
    private BackendChannel channel;
    private static AtomicInteger tagCounter = new AtomicInteger();
    private List<ExchangeSpec> exchanges = new ArrayList<ExchangeSpec>();
    private List<QueueSpec> queues = new ArrayList<QueueSpec>();
    private Map<String, ConsumeSpec> consumers = new HashMap<String, ConsumeSpec>();
    private PrefetchSpec prefetchSpec;

    public synchronized void exchangeDeclare(@NonNull ExchangeSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.doExchangeDeclare(spec);
        this.exchanges.add(spec);
    }

    private void doExchangeDeclare(@NonNull ExchangeSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.exchangeDeclare(spec.getName(), spec.getType(), spec.isDurable(), spec.isAutoDelete(), spec.isInternal(), spec.getArgs())), spec);
    }

    public synchronized void queueDeclare(@NonNull QueueSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.doQueueDeclare(spec);
        this.queues.add(spec);
    }

    private void doQueueDeclare(@NonNull QueueSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.queueDeclare(spec.getName(), spec.isDurable(), spec.isExclusive(), spec.isAutoDelete(), spec.getArgs())), spec);
    }

    public void prefetch(@NonNull PrefetchSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.doPrefetch(spec);
        this.prefetchSpec = spec;
    }

    private void doPrefetch(PrefetchSpec spec) {
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.basicQos(spec.getAmount())), spec);
    }

    public void publish(@NonNull PublishSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.basicPublish(spec.getExchange(), spec.getRoutingKey(), spec.isMandatory(), new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(spec.isDurable() ? 2 : 1)).build(), spec.getBody())), spec);
    }

    public void consume(@NonNull ConsumeSpec spec) {
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        String consumerTag = "consumer-" + tagCounter.getAndIncrement();
        this.doConsume(consumerTag, spec);
        this.consumers.put(consumerTag, spec);
    }

    private void doConsume(@NonNull String consumerTag, final @NonNull ConsumeSpec spec) {
        if (consumerTag == null) {
            throw new NullPointerException("consumerTag");
        }
        if (spec == null) {
            throw new NullPointerException("spec");
        }
        this.perform((ThrowingConsumer<BackendChannel>)((ThrowingConsumer)chan -> chan.basicConsume(spec.getQueue(), consumerTag, spec.isNoLocal(), spec.isExclusive(), spec.getArgs(), new Consumer((BackendChannel)chan){
            final /* synthetic */ BackendChannel val$chan;
            {
                this.val$chan = backendChannel;
            }

            public void handleConsumeOk(String consumerTag) {
            }

            public void handleCancelOk(String consumerTag) {
            }

            public void handleCancel(String consumerTag) throws IOException {
            }

            public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                Channel.this.reset();
            }

            public void handleRecoverOk(String consumerTag) {
            }

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                Result.trying(() -> spec.getCallback().accept((Object)new Delivery(this.val$chan, envelope, properties, body))).orElse(ex -> {
                    Result.trying(() -> this.val$chan.basicCancel(consumerTag));
                    log.warn("Error while (n)acking delivery, will retry consuming", ex);
                    Channel.this.reset();
                });
            }
        })), spec);
    }

    private void perform(ThrowingConsumer<BackendChannel> operation, OperationSpec spec) {
        this.retryer.performWithRetry(() -> operation.accept((Object)this.activeChannel()), this.getRetryPolicy(spec).withErrorHandler(ex -> {
            if (Retryer.shouldReconnectToRecover(ex)) {
                this.reset();
            }
        }));
    }

    private RetryPolicy getRetryPolicy(OperationSpec spec) {
        if (spec.getRetryPolicy() != null) {
            return spec.getRetryPolicy();
        }
        return this.connection.getRetryPolicy();
    }

    private synchronized BackendChannel activeChannel() {
        if (this.channel == null) {
            this.channel = (BackendChannel)this.retryer.performWithRetry(() -> this.connection.activeConnection().newChannel(), new RetryPolicy().withRetryAll(true));
        }
        return this.channel;
    }

    private synchronized void reset() {
        if (this.channel != null) {
            this.consumers.keySet().forEach(tag -> Result.trying(() -> this.channel.basicCancel((String)tag)));
            Result.trying(this.channel::close);
        }
        this.channel = null;
        this.connection.reset();
        this.exchanges.forEach(this::doExchangeDeclare);
        this.queues.forEach(this::doQueueDeclare);
        if (this.prefetchSpec != null) {
            this.doPrefetch(this.prefetchSpec);
        }
        this.consumers.entrySet().forEach(entry -> this.doConsume((String)entry.getKey(), (ConsumeSpec)entry.getValue()));
    }

    @ConstructorProperties(value={"connection", "retryer"})
    public Channel(@NonNull Connection connection, @NonNull Retryer retryer) {
        if (connection == null) {
            throw new NullPointerException("connection");
        }
        if (retryer == null) {
            throw new NullPointerException("retryer");
        }
        this.connection = connection;
        this.retryer = retryer;
    }
}

