/*
 * Decompiled with CFR 0.152.
 */
package hkube.communication.streaming.zmq;

import hkube.algo.ICommandSender;
import hkube.communication.streaming.Flow;
import hkube.communication.streaming.IListener;
import hkube.communication.streaming.IMessageHandler;
import hkube.communication.streaming.Message;
import hkube.communication.streaming.zmq.IReadyUpdater;
import hkube.communication.streaming.zmq.Signals;
import hkube.encoding.EncodingManager;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class Listener
implements IListener {
    private static final int CYCLE_LENGTH_MS = 1;
    String remoteHost;
    String remotePort;
    IMessageHandler messageHandler;
    String name;
    EncodingManager encodingManager;
    ICommandSender errorHandler;
    ZMQ.Socket worker;
    IReadyUpdater readyUpdater;
    Listener me;
    long lastReceiveTime;
    static Lock lock = new ReentrantLock();
    private static final int HEARTBEAT_LIVENESS = 300;
    private static final int HEARTBEAT_INTERVAL = 10;
    private static final int INTERVAL_INIT = 1000;
    private static final int INTERVAL_MAX = 32000;
    private static final int HEARTBEAT_LIVENESS_TIMEOUT = 30000;
    private static final int POLL_TIMEOUT_MS = 1000;
    private static final int STOP_TIMEOUT_MS = 5000;
    private long lastSentTime;
    private boolean active = false;
    private boolean stillWorking = true;
    private boolean forceClose = false;
    ZMQ.Poller poller;
    private static final Logger logger = LogManager.getLogger();
    boolean isReady = true;
    boolean isReadySentValue = true;
    String id;

    public Listener(String remoteHost, String remotePort, String encoding, String name, ICommandSender errorHandler) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.errorHandler = errorHandler;
        this.encodingManager = new EncodingManager(encoding);
        this.name = name;
        this.id = remoteHost + remotePort;
        this.me = this;
    }

    public void setMessageHandler(IMessageHandler handler) {
        this.messageHandler = handler;
    }

    public String getId() {
        return this.id;
    }

    public void ready(boolean isReady) {
        this.isReady = isReady;
    }

    private void worker_socket(ZMQ.Context ctx) {
        Random rand = new Random(System.nanoTime());
        ZMQ.Socket worker = ctx.socket(5);
        String identity = String.format("%04X-%04X", rand.nextInt(65536), rand.nextInt(65536));
        worker.setIdentity(identity.getBytes());
        worker.connect("tcp://" + this.remoteHost + ":" + this.remotePort);
        System.out.println(identity + " connected\n");
        this.worker = worker;
        this.poller = new ZMQ.Poller(1);
        this.poller.register(worker, 1);
        this.lastReceiveTime = new Date().getTime();
    }

    public void fetch() {
        if (this.active) {
            this.send(Signals.PPP_READY, null);
            int rc = this.poller.poll(1000L);
            if (rc == -1) {
                System.out.print("Poll failed break loop");
            } else if (this.poller.pollin(0)) {
                ZFrame signalFrame;
                ZMsg zmqMsg = ZMsg.recvMsg((ZMQ.Socket)this.worker);
                if (zmqMsg == null) {
                    System.out.print("Got null frame");
                }
                if (Signals.getByBytes((signalFrame = zmqMsg.pop()).getData()) == Signals.PPP_MSG) {
                    ZFrame frame = zmqMsg.pop();
                    byte[] flowBytes = frame.getData();
                    frame = zmqMsg.pop();
                    byte[] header = frame.getData();
                    frame = zmqMsg.pop();
                    byte[] data = frame.getData();
                    List flowList = (List)this.encodingManager.decodeNoHeader(flowBytes);
                    Flow flow = new Flow(flowList);
                    Message msg = new Message(data, header, flow);
                    byte[] response = this.messageHandler.onMessage(msg);
                    this.send(Signals.PPP_DONE, response);
                }
            } else {
                System.out.println("Nothing on poll");
            }
        } else if (this.stillWorking) {
            this.worker.close();
            this.stillWorking = false;
        }
    }

    public void start() {
        try {
            this.active = true;
            ZMQ.Context ctx = ZMQ.context((int)1);
            this.worker_socket(ctx);
        }
        catch (Exception exc) {
            exc.printStackTrace();
            HashMap<String, String> res = new HashMap<String, String>();
            res.put("code", "Failed");
            res.put("message", exc.toString());
            this.errorHandler.sendMessage("errorMessage", res, true);
        }
    }

    void send(Signals signal, byte[] response) {
        ZFrame frame = new ZFrame(signal.toString());
        frame.send(this.worker, 2);
        frame = new ZFrame(this.encodingManager.encodeNoHeader((Object)this.name));
        frame.send(this.worker, 2);
        if (response == null) {
            response = Signals.PPP_EMPTY.toBytes();
        }
        frame = new ZFrame(response);
        frame.send(this.worker, 0);
        this.lastSentTime = new Date().getTime();
    }

    public void close(boolean forceClose) {
        this.active = false;
        while (this.stillWorking) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

