/*
 * Decompiled with CFR 0.152.
 */
package io.rapidw.mqtt.client.v3_1_1;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.rapidw.mqtt.client.v3_1_1.MqttClientException;
import io.rapidw.mqtt.client.v3_1_1.MqttConnectionException;
import io.rapidw.mqtt.client.v3_1_1.MqttConnectionOption;
import io.rapidw.mqtt.client.v3_1_1.MqttPendingMessage;
import io.rapidw.mqtt.client.v3_1_1.MqttPendingSubscription;
import io.rapidw.mqtt.client.v3_1_1.MqttPendingUnsubscription;
import io.rapidw.mqtt.client.v3_1_1.MqttSubscription;
import io.rapidw.mqtt.client.v3_1_1.MqttTopicTree;
import io.rapidw.mqtt.client.v3_1_1.handler.MqttConnectResultHandler;
import io.rapidw.mqtt.client.v3_1_1.handler.MqttMessageHandler;
import io.rapidw.mqtt.client.v3_1_1.handler.MqttPublishResultHandler;
import io.rapidw.mqtt.client.v3_1_1.handler.MqttSubscribeResultHandler;
import io.rapidw.mqtt.client.v3_1_1.handler.MqttUnsubscribeResultHandler;
import io.rapidw.mqtt.client.v3_1_1.handler.TcpConnectResultHandler;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311ConnAckPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311ConnectPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311ConnectReturnCode;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311DisconnectPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311Packet;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311PingReqPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311PubAckPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311PublishPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311QosLevel;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311SubAckPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311SubscribePacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311TopicAndQosLevel;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311UnsubAckPacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311UnsubscribePacket;
import io.rapidw.mqtt.codec.v3_1_1.MqttV311Will;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttConnection {
    private static Logger log = LoggerFactory.getLogger(MqttConnection.class);
    private final MqttConnectionOption connectionOption;
    private final Bootstrap bootstrap;
    private Status status = Status.NOT_CONNECTING;
    private Channel channel;
    private final IntObjectHashMap<MqttPendingSubscription> pendingSubscriptions = new IntObjectHashMap();
    private final IntObjectHashMap<MqttPendingUnsubscription> pendingUnsubscribes = new IntObjectHashMap();
    private final LinkedHashMap<Integer, MqttPendingMessage> pendingMessages = new LinkedHashMap();
    private final MqttTopicTree subscriptionTree = new MqttTopicTree();
    private Handler handler;
    private MqttConnectResultHandler mqttConnectResultHandler;
    private final AtomicInteger currentPacketId = new AtomicInteger();
    private Promise<Void> closePromise;

    MqttConnection(Bootstrap bootstrap, MqttConnectionOption connectionOption) {
        this.connectionOption = connectionOption;
        this.bootstrap = bootstrap;
    }

    Handler handler() {
        this.handler = new Handler();
        return this.handler;
    }

    public void connect(TcpConnectResultHandler tcpConnectResultHandler, MqttConnectResultHandler mqttConnectResultHandler) {
        this.closePromise = this.bootstrap.config().group().next().newPromise();
        if (this.status != Status.NOT_CONNECTING) {
            mqttConnectResultHandler.onError(this, new MqttClientException("invalid connection status: " + this.status.name()));
        }
        this.mqttConnectResultHandler = mqttConnectResultHandler;
        this.handler.connect(tcpConnectResultHandler);
    }

    public void subscribe(List<MqttV311TopicAndQosLevel> topicAndQosLevels, MqttMessageHandler mqttMessageHandler, MqttSubscribeResultHandler subscribeResultHandler) {
        if (this.status != Status.CONNECTED) {
            subscribeResultHandler.onError(this, new MqttClientException("invalid connection status: " + this.status.name()));
        }
        this.handler.subscribe(topicAndQosLevels, mqttMessageHandler, subscribeResultHandler);
    }

    public void publishQos0Message(String topic, boolean retain, byte[] payload) {
        this.publish(topic, MqttV311QosLevel.AT_MOST_ONCE, retain, payload, null);
    }

    public void publishQos0Message(String topic, boolean retain, byte[] payload, MqttPublishResultHandler publishResultHandler) {
        this.publish(topic, MqttV311QosLevel.AT_MOST_ONCE, retain, payload, Objects.requireNonNull(publishResultHandler));
    }

    public void publishQos1Message(String topic, boolean retain, byte[] payload, MqttPublishResultHandler publishResultHandler) {
        this.publish(topic, MqttV311QosLevel.AT_LEAST_ONCE, retain, payload, Objects.requireNonNull(publishResultHandler));
    }

    public void publishQos2Message(String topic, boolean retain, byte[] payload, MqttPublishResultHandler publishResultHandler) {
        this.publish(topic, MqttV311QosLevel.EXACTLY_ONCE, retain, payload, Objects.requireNonNull(publishResultHandler));
    }

    private void publish(String topic, MqttV311QosLevel qos, boolean retain, byte[] payload, MqttPublishResultHandler publishResultHandler) {
        if (this.status != Status.CONNECTED) {
            publishResultHandler.onError(this, new MqttClientException("invalid connection status: " + this.status.name()));
        }
        this.handler.publish(topic, qos, retain, payload, publishResultHandler);
    }

    public void unsubscribe(List<MqttSubscription> subscriptions, MqttUnsubscribeResultHandler unsubscribeResultHandler) {
        this.handler.unsubscribe(subscriptions, unsubscribeResultHandler);
    }

    void unsubscribe(MqttSubscription subscription, MqttUnsubscribeResultHandler unsubscribeResultHandler) {
        this.handler.unsubscribe(Collections.singletonList(subscription), unsubscribeResultHandler);
    }

    public void disconnect() {
        this.handler.disconnect();
    }

    public void waitForClose() {
        try {
            this.closePromise.await();
        }
        catch (InterruptedException e) {
            this.connectionOption.getExceptionHandler().onException(this, e);
        }
    }

    private int nextPacketId() {
        return this.currentPacketId.accumulateAndGet(1, (current, update) -> (current += update) > 65535 ? 1 : current);
    }

    class Handler
    extends SimpleChannelInboundHandler<MqttV311Packet> {
        private ScheduledFuture<?> pingRespTimeoutFuture;

        Handler() {
        }

        protected void channelRead0(ChannelHandlerContext ctx, MqttV311Packet packet) {
            switch (packet.getType()) {
                case CONNACK: {
                    this.handleConnAck((MqttV311ConnAckPacket)packet);
                    break;
                }
                case SUBACK: {
                    this.handleSubAck((MqttV311SubAckPacket)packet);
                    break;
                }
                case PUBLISH: {
                    this.handlePublish((MqttV311PublishPacket)packet);
                    break;
                }
                case UNSUBACK: {
                    this.handleUnSubAck((MqttV311UnsubAckPacket)packet);
                    break;
                }
                case PUBACK: {
                    this.handlePubAck((MqttV311PubAckPacket)packet);
                    break;
                }
                case PUBREC: {
                    break;
                }
                case PUBREL: {
                    break;
                }
                case PUBCOMP: {
                    break;
                }
                case PINGRESP: {
                    this.handlePingResp();
                    break;
                }
                default: {
                    log.warn("unknown message Type");
                }
            }
        }

        private void handlePubAck(MqttV311PubAckPacket packet) {
            MqttPendingMessage pending;
            int curr = MqttConnection.this.currentPacketId.get();
            if (packet.getPacketId() != curr) {
                this.throwException(new MqttClientException("invalid SUBACK packetId, required: " + curr + ", got: " + packet.getPacketId()));
            }
            if ((pending = (MqttPendingMessage)MqttConnection.this.pendingMessages.remove(curr)) != null) {
                if (pending.getQosLevel() == MqttV311QosLevel.AT_LEAST_ONCE) {
                    pending.getPublishResultHandler().onSuccess(MqttConnection.this);
                } else {
                    this.throwException(new MqttClientException("message in QoS 2 is UNSUPPORTED!"));
                }
            } else {
                this.throwException(new MqttClientException("invalid packetId in PUBACK packet, pending message not found"));
            }
        }

        private void handlePublish(MqttV311PublishPacket packet) {
            if (packet.getQosLevel() != MqttV311QosLevel.AT_MOST_ONCE) {
                throw new UnsupportedOperationException("current only QoS 0 message supported");
            }
            Set<MqttMessageHandler> handlers = MqttConnection.this.subscriptionTree.getHandlersByTopicName(packet.getTopic());
            if (handlers.size() == 0) {
                this.throwException(new MqttClientException("PUBLISH packet without message handler received, topic: " + packet.getTopic()));
            }
            for (MqttMessageHandler handler : handlers) {
                handler.onMessage(MqttConnection.this, packet.getTopic(), packet.getQosLevel(), packet.isRetain(), packet.isDupFlag(), packet.getPacketId(), packet.getPayload());
            }
            if (packet.getQosLevel() == MqttV311QosLevel.AT_LEAST_ONCE) {
                this.pubAck(packet.getPacketId());
            }
        }

        private void handleSubAck(MqttV311SubAckPacket packet) {
            MqttPendingSubscription pending;
            int curr = MqttConnection.this.currentPacketId.get();
            if (packet.getPacketId() != curr) {
                this.throwException(new MqttClientException("invalid SUBACK packetId, required: " + curr + ", got: " + packet.getPacketId()));
            }
            if ((pending = (MqttPendingSubscription)MqttConnection.this.pendingSubscriptions.remove(curr)) != null) {
                List<MqttV311TopicAndQosLevel> topicAndQosLevels = pending.getTopicAndQosLevels();
                MqttMessageHandler messageHandler = pending.getMessageHandler();
                LinkedList<MqttSubscription> subscriptionList = new LinkedList<MqttSubscription>();
                List packetQosList = packet.getQosLevels();
                if (topicAndQosLevels.size() != packetQosList.size()) {
                    this.throwException(new MqttClientException("count of topics in SUBACK packet does not match SUBSCRIBE packet"));
                }
                Iterator packIter = packetQosList.iterator();
                Iterator<MqttV311TopicAndQosLevel> pendingIter = topicAndQosLevels.iterator();
                while (packIter.hasNext() && pendingIter.hasNext()) {
                    MqttV311QosLevel qos = (MqttV311QosLevel)packIter.next();
                    MqttV311TopicAndQosLevel topicAndQos = pendingIter.next();
                    MqttConnection.this.subscriptionTree.addSubscription(new MqttV311TopicAndQosLevel(topicAndQos.getTopicFilter(), qos), messageHandler);
                    subscriptionList.add(MqttSubscription.builder().connection(MqttConnection.this).topicFilter(topicAndQos.getTopicFilter()).messageHandler(messageHandler).topicFilter(topicAndQos.getTopicFilter()).qosLevel(qos).build());
                }
                pending.getMqttSubscribeResultHandler().onSuccess(MqttConnection.this, subscriptionList);
            } else {
                this.throwException(new MqttClientException("invalid packetId in SUBACK packet, pending subscription not found"));
            }
        }

        private void handleConnAck(MqttV311ConnAckPacket packet) {
            log.debug("handle CONACK");
            MqttConnection.this.channel.pipeline().remove("mqtt_connect_timer");
            if (packet.getConnectReturnCode() == MqttV311ConnectReturnCode.CONNECTION_ACCEPTED) {
                if (MqttConnection.this.connectionOption.getKeepAliveSeconds() > 0L) {
                    MqttConnection.this.channel.pipeline().addBefore("mqtt_client_handler", "mqtt_keepalive_handler", (ChannelHandler)new IdleStateHandler(0, (int)MqttConnection.this.connectionOption.getKeepAliveSeconds(), 0));
                }
                MqttConnection.this.status = Status.CONNECTED;
                MqttConnection.this.mqttConnectResultHandler.onSuccess(MqttConnection.this);
            } else {
                MqttConnection.this.channel.close();
                MqttConnection.this.mqttConnectResultHandler.onError(MqttConnection.this, new MqttConnectionException(packet.getConnectReturnCode()));
            }
        }

        private void handlePingResp() {
            log.debug("handle PINGRESP");
            if (this.pingRespTimeoutFuture != null && !this.pingRespTimeoutFuture.isCancelled() && !this.pingRespTimeoutFuture.isDone()) {
                this.pingRespTimeoutFuture.cancel(true);
                this.pingRespTimeoutFuture = null;
            }
        }

        private void handleUnSubAck(MqttV311UnsubAckPacket packet) {
            MqttPendingUnsubscription pending;
            log.debug("handle UNSUBACK");
            int curr = MqttConnection.this.currentPacketId.get();
            if (packet.getPacketId() != curr) {
                this.throwException(new MqttClientException("invalid UNSUBACK packetId, required: " + curr + ", got: " + packet.getPacketId()));
            }
            if ((pending = (MqttPendingUnsubscription)MqttConnection.this.pendingUnsubscribes.remove(curr)) != null) {
                for (String topicFilter : pending.getTopicFilters()) {
                    MqttConnection.this.subscriptionTree.removeSubscription(topicFilter);
                }
                pending.getUnsubscribeResultHandler().onSuccess(MqttConnection.this);
            } else {
                this.throwException(new MqttClientException("invalid packetId in UNSUBACK packet, pending unsubscription not found"));
            }
        }

        public void channelActive(ChannelHandlerContext ctx) {
            log.debug("channel active");
            if (MqttConnection.this.connectionOption.getServerCertificate() == null) {
                log.debug("raw socket");
                this.doConnect(ctx);
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.debug("error", cause);
            if (cause instanceof ReadTimeoutException && MqttConnection.this.status == Status.CONNECTING) {
                MqttConnection.this.mqttConnectResultHandler.onTimeout(MqttConnection.this);
            } else if (cause instanceof DecoderException) {
                this.throwException(new MqttClientException("decoder exception", cause));
            } else {
                super.exceptionCaught(ctx, cause);
            }
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            IdleStateEvent idleStateEvent;
            if (evt instanceof IdleStateEvent && (idleStateEvent = (IdleStateEvent)evt).state() == IdleState.WRITER_IDLE) {
                log.debug("send PINGREQ");
                this.pingReq(ctx.channel());
            }
            if (evt instanceof SslHandshakeCompletionEvent) {
                log.debug("ssl socket");
                this.doConnect(ctx);
            }
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            MqttConnection.this.closePromise.setSuccess(null);
            log.debug("connection closed");
        }

        private void doConnect(ChannelHandlerContext ctx) {
            log.debug("channel active");
            MqttV311ConnectPacket.MqttV311ConnectPacketBuilder packetBuilder = MqttV311ConnectPacket.builder().username(MqttConnection.this.connectionOption.getUsername()).password(MqttConnection.this.connectionOption.getPassword()).clientId(MqttConnection.this.connectionOption.getClientId()).keepAliveSeconds((int)MqttConnection.this.connectionOption.getKeepAliveSeconds()).cleanSession(MqttConnection.this.connectionOption.isCleanSession());
            if (MqttConnection.this.connectionOption.getWill() != null) {
                packetBuilder.will(MqttV311Will.builder().topic(MqttConnection.this.connectionOption.getWill().getTopic()).message(MqttConnection.this.connectionOption.getWill().getMessage()).retain(MqttConnection.this.connectionOption.getWill().isRetain()).qosLevel(MqttConnection.this.connectionOption.getWill().getQosLevel()).build());
            }
            ctx.channel().writeAndFlush((Object)packetBuilder.build()).addListener(future -> {
                if (future.isSuccess()) {
                    log.debug("send connect success");
                } else {
                    log.debug("send connect failed", future.cause());
                    MqttConnection.this.status = Status.CLOSED;
                    ctx.close();
                }
            });
            if (MqttConnection.this.connectionOption.getMqttConnectTimeout() > 0L) {
                ctx.pipeline().addBefore("mqtt_client_handler", "mqtt_connect_timer", (ChannelHandler)new ReadTimeoutHandler(MqttConnection.this.connectionOption.getMqttConnectTimeout(), TimeUnit.MILLISECONDS));
            }
        }

        public void connect(TcpConnectResultHandler tcpConnectResultHandler) {
            MqttConnection.this.bootstrap.connect(MqttConnection.this.connectionOption.getHost(), MqttConnection.this.connectionOption.getPort()).addListener(future -> {
                ChannelFuture channelFuture = (ChannelFuture)future;
                if (future.isSuccess()) {
                    MqttConnection.this.channel = channelFuture.channel();
                    tcpConnectResultHandler.onSuccess(MqttConnection.this);
                } else {
                    Throwable cause = future.cause();
                    if (cause instanceof ConnectTimeoutException) {
                        tcpConnectResultHandler.onTimeout(MqttConnection.this);
                    } else {
                        tcpConnectResultHandler.onError(MqttConnection.this, cause);
                    }
                    MqttConnection.this.status = Status.CLOSED;
                    MqttConnection.this.closePromise.setSuccess(null);
                }
            });
            MqttConnection.this.status = Status.CONNECTING;
        }

        public void subscribe(List<MqttV311TopicAndQosLevel> topicAndQosList, MqttMessageHandler mqttMessageHandler, MqttSubscribeResultHandler mqttSubscribeResultHandler) {
            int packetId = MqttConnection.this.nextPacketId();
            MqttV311SubscribePacket packet = MqttV311SubscribePacket.builder().packetId(packetId).topicAndQosLevels(topicAndQosList).build();
            MqttConnection.this.channel.writeAndFlush((Object)packet).addListener(future -> {
                if (future.isSuccess()) {
                    MqttPendingSubscription pendingSubscription = MqttPendingSubscription.builder().topicAndQosLevels(topicAndQosList).messageHandler(mqttMessageHandler).mqttSubscribeResultHandler(mqttSubscribeResultHandler).build();
                    MqttConnection.this.pendingSubscriptions.put(packetId, (Object)pendingSubscription);
                } else {
                    mqttSubscribeResultHandler.onError(MqttConnection.this, future.cause());
                }
            });
        }

        public void publish(String topic, MqttV311QosLevel qosLevel, boolean retain, byte[] payload, MqttPublishResultHandler mqttPublishResultHandler) {
            if (qosLevel == MqttV311QosLevel.EXACTLY_ONCE) {
                throw new UnsupportedOperationException("publish with qos1 or qos2 current unsupported");
            }
            int packetId = 0;
            MqttV311PublishPacket.Builder builder = MqttV311PublishPacket.builder().topic(topic).qosLevel(qosLevel).dupFlag(false).retain(retain).payload(payload);
            if (qosLevel == MqttV311QosLevel.AT_LEAST_ONCE) {
                packetId = MqttConnection.this.nextPacketId();
                builder.packetId(Integer.valueOf(packetId));
            }
            int finalPacketId = packetId;
            MqttV311PublishPacket packet = builder.build();
            MqttConnection.this.channel.writeAndFlush((Object)packet).addListener(future -> {
                if (qosLevel == MqttV311QosLevel.AT_MOST_ONCE) {
                    if (mqttPublishResultHandler != null) {
                        if (future.isSuccess()) {
                            mqttPublishResultHandler.onSuccess(MqttConnection.this);
                        } else {
                            mqttPublishResultHandler.onError(MqttConnection.this, future.cause());
                        }
                    }
                } else if (future.isSuccess()) {
                    MqttConnection.this.pendingMessages.put(finalPacketId, MqttPendingMessage.builder().packet(packet).publishResultHandler(mqttPublishResultHandler).qosLevel(qosLevel).build());
                } else {
                    mqttPublishResultHandler.onError(MqttConnection.this, future.cause());
                }
            });
        }

        private void pingReq(Channel channel) {
            if (this.pingRespTimeoutFuture == null) {
                this.pingRespTimeoutFuture = channel.eventLoop().schedule(() -> channel.writeAndFlush((Object)MqttV311DisconnectPacket.INSTANCE).addListener((GenericFutureListener)ChannelFutureListener.CLOSE), MqttConnection.this.connectionOption.getKeepAliveSeconds(), TimeUnit.SECONDS);
            }
            channel.writeAndFlush((Object)MqttV311PingReqPacket.INSTANCE).addListener(future -> {
                if (!future.isSuccess()) {
                    this.throwException(new MqttClientException("send PINGREQ error", future.cause()));
                }
            });
        }

        public void close() {
            MqttConnection.this.status = Status.CLOSED;
            MqttConnection.this.channel.close();
        }

        public void unsubscribe(List<MqttSubscription> subscriptions, MqttUnsubscribeResultHandler unsubscribeResultHandler) {
            int packetId = MqttConnection.this.nextPacketId();
            LinkedList topicFilters = new LinkedList();
            subscriptions.forEach(v -> topicFilters.add(v.getTopicFilter()));
            MqttV311UnsubscribePacket packet = MqttV311UnsubscribePacket.builder().topicFilters(topicFilters).packetId(packetId).build();
            MqttConnection.this.channel.writeAndFlush((Object)packet).addListener(future -> {
                if (future.isSuccess()) {
                    MqttConnection.this.pendingUnsubscribes.put(packetId, (Object)new MqttPendingUnsubscription(topicFilters, unsubscribeResultHandler));
                } else {
                    unsubscribeResultHandler.onError(MqttConnection.this, future.cause());
                }
            });
        }

        public void pubAck(int packetId) {
            MqttV311PubAckPacket packet = MqttV311PubAckPacket.builder().packetId(packetId).build();
            MqttConnection.this.channel.writeAndFlush((Object)packet).addListener(future -> {
                if (!future.isSuccess()) {
                    this.throwException(future.cause());
                }
            });
        }

        private void throwException(Throwable cause) {
            this.disconnect();
            MqttConnection.this.connectionOption.getExceptionHandler().onException(MqttConnection.this, cause);
        }

        public void disconnect() {
            MqttConnection.this.channel.writeAndFlush((Object)MqttV311DisconnectPacket.INSTANCE);
            this.close();
        }
    }

    private static enum Status {
        NOT_CONNECTING,
        CONNECTING,
        CONNECTED,
        CLOSED;

    }
}

