/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.concepts.majordomo;

import io.opencmw.concepts.majordomo.MajordomoProtocol;
import io.opencmw.rbac.RbacRole;
import io.opencmw.utils.SystemProperties;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

public class MajordomoWorker
extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(MajordomoWorker.class);
    private static final int HEARTBEAT_LIVENESS = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeatLiveness", (int)3);
    private static final int HEARTBEAT_INTERVAL = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeat", (int)2500);
    private static final AtomicInteger WORKER_COUNTER = new AtomicInteger();
    protected final String uniqueID;
    protected final ZContext ctx;
    private final String brokerAddress;
    private final String serviceName;
    private final byte[] serviceBytes;
    private final AtomicBoolean run = new AtomicBoolean(true);
    private final SortedSet<RbacRole<?>> rbacRoles;
    private ZMQ.Socket workerSocket;
    private long heartbeatAt;
    private int liveness;
    private int reconnect = 2500;
    private RequestHandler requestHandler;
    private ZMQ.Poller poller;

    public MajordomoWorker(String brokerAddress, String serviceName, RbacRole<?> ... rbacRoles) {
        this(null, brokerAddress, serviceName, rbacRoles);
    }

    public MajordomoWorker(ZContext ctx, String serviceName, RbacRole<?> ... rbacRoles) {
        this(ctx, "inproc://broker", serviceName, rbacRoles);
    }

    protected MajordomoWorker(ZContext ctx, String brokerAddress, String serviceName, RbacRole<?> ... rbacRoles) {
        assert (brokerAddress != null);
        assert (serviceName != null);
        this.brokerAddress = brokerAddress;
        this.serviceName = serviceName;
        this.serviceBytes = serviceName.getBytes(StandardCharsets.UTF_8);
        this.rbacRoles = Collections.unmodifiableSortedSet(new TreeSet(Set.of(rbacRoles)));
        this.ctx = Objects.requireNonNullElseGet(ctx, ZContext::new);
        if (ctx != null) {
            this.setDaemon(true);
        }
        this.setName(MajordomoWorker.class.getSimpleName() + "#" + WORKER_COUNTER.getAndIncrement());
        this.uniqueID = this.serviceName + "-PID=" + ManagementFactory.getRuntimeMXBean().getName() + "-TID=" + this.getId();
        this.setName(this.getClass().getSimpleName() + "(\"" + this.serviceName + "\")-" + this.uniqueID);
        LOGGER.atDebug().addArgument((Object)serviceName).addArgument((Object)this.uniqueID).log("created new service '{}' worker - uniqueID: {}");
    }

    public void destroy() {
        this.ctx.destroy();
    }

    public int getHeartbeat() {
        return HEARTBEAT_INTERVAL;
    }

    public SortedSet<RbacRole<?>> getRbacRoles() {
        return this.rbacRoles;
    }

    public int getReconnect() {
        return this.reconnect;
    }

    public RequestHandler getRequestHandler() {
        return this.requestHandler;
    }

    public String getServiceName() {
        return this.serviceName;
    }

    public String getUniqueID() {
        return this.uniqueID;
    }

    public MajordomoProtocol.MdpMessage handleRequestsFromBoker(MajordomoProtocol.MdpWorkerMessage request) {
        if (request == null) {
            return null;
        }
        switch (request.command) {
            case W_REQUEST: {
                return this.processRequest(request, request.clientSourceID);
            }
            case W_HEARTBEAT: {
                break;
            }
            case W_DISCONNECT: {
                this.reconnectToBroker();
                break;
            }
            default: {
                if (!LOGGER.isDebugEnabled()) break;
                LOGGER.atDebug().addArgument((Object)this.uniqueID).addArgument((Object)request).log("worer '{}' received invalid message: '{}'");
            }
        }
        return null;
    }

    public MajordomoProtocol.MdpMessage processRequest(MajordomoProtocol.MdpMessage request, byte[] clientSourceID) {
        if (this.requestHandler != null) {
            try {
                byte[][] payload = this.requestHandler.handle(request.payload);
                return new MajordomoProtocol.MdpWorkerMessage(request.senderID, MajordomoProtocol.MdpWorkerCommand.W_REPLY, this.serviceBytes, clientSourceID, payload);
            }
            catch (Throwable e) {
                StringWriter sw = new StringWriter();
                PrintWriter pw = new PrintWriter(sw);
                e.printStackTrace(pw);
                StringBuilder builder = new StringBuilder();
                builder.append(this.getClass().getName()).append(" caught exception in user-provided call-back function for service '").append(this.getServiceName()).append("'\nrequest msg: ").append(request).append("\nexception: ").append(sw.toString());
                String exceptionMsg = builder.toString();
                return new MajordomoProtocol.MdpWorkerMessage(request.senderID, MajordomoProtocol.MdpWorkerCommand.W_REPLY, this.serviceBytes, clientSourceID, (byte[][])new byte[][]{exceptionMsg.getBytes(StandardCharsets.UTF_8)});
            }
        }
        return null;
    }

    public void registerHandler(RequestHandler handler) {
        this.requestHandler = handler;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            this.handleReceive();
        }
        this.destroy();
    }

    public void setReconnect(int reconnect) {
        this.reconnect = reconnect;
    }

    @Override
    public synchronized void start() {
        this.run.set(true);
        this.reconnectToBroker();
        super.start();
    }

    public void stopWorker() {
        this.run.set(false);
    }

    protected void handleReceive() {
        while (this.run.get() && !Thread.currentThread().isInterrupted() && this.poller.poll((long)HEARTBEAT_INTERVAL) != -1) {
            if (this.poller.pollin(0)) {
                MajordomoProtocol.MdpMessage msg = MajordomoProtocol.receiveMdpMessage(this.workerSocket);
                if (msg == null) continue;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.atDebug().addArgument((Object)this.uniqueID).addArgument((Object)msg).log("worker '{}' received new message from broker: '{}'");
                }
                this.liveness = HEARTBEAT_LIVENESS;
                assert (msg.payload != null) : "MdpWorkerMessage payload is null";
                if (!(msg instanceof MajordomoProtocol.MdpWorkerMessage)) {
                    assert (false) : "msg is not instance of MdpWorkerMessage";
                    continue;
                }
                MajordomoProtocol.MdpWorkerMessage workerMessage = (MajordomoProtocol.MdpWorkerMessage)msg;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.atDebug().addArgument((Object)this.uniqueID).addArgument((Object)workerMessage).log("worker '{}' received request: '{}'");
                }
                MajordomoProtocol.MdpMessage reply = this.handleRequestsFromBoker(workerMessage);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.atDebug().addArgument((Object)this.uniqueID).addArgument((Object)reply).log("worker '{}' received reply: '{}'");
                }
                if (reply != null) {
                    MajordomoProtocol.sendWorkerMessage(this.workerSocket, MajordomoProtocol.MdpWorkerCommand.W_REPLY, reply.senderID, workerMessage.clientSourceID, reply.payload);
                }
            } else if (--this.liveness == 0) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.atDebug().addArgument((Object)this.uniqueID).log("worker '{}' disconnected from broker - retrying");
                }
                try {
                    Thread.sleep(this.reconnect);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                this.reconnectToBroker();
            }
            if (System.currentTimeMillis() <= this.heartbeatAt) continue;
            MajordomoProtocol.sendWorkerMessage(this.workerSocket, MajordomoProtocol.MdpWorkerCommand.W_HEARTBEAT, null, null, new byte[0][]);
            this.heartbeatAt = System.currentTimeMillis() + (long)HEARTBEAT_INTERVAL;
        }
        if (Thread.currentThread().isInterrupted()) {
            LOGGER.atInfo().addArgument((Object)this.uniqueID).log("worker '{}' interrupt received, killing worker");
        }
    }

    protected void reconnectToBroker() {
        if (this.workerSocket != null) {
            this.workerSocket.close();
        }
        this.workerSocket = this.ctx.createSocket(SocketType.DEALER);
        assert (this.workerSocket != null) : "worker socket is null";
        assert (this.workerSocket.getSocketType() == SocketType.DEALER) : "worker socket type is " + this.workerSocket.getSocketType() + " instead of " + SocketType.DEALER;
        this.workerSocket.setHWM(0);
        this.workerSocket.connect(this.brokerAddress);
        LOGGER.atDebug().addArgument((Object)this.uniqueID).addArgument((Object)this.brokerAddress).log("worker '{}' connecting to broker at '{}'");
        MajordomoProtocol.sendWorkerMessage(this.workerSocket, MajordomoProtocol.MdpWorkerCommand.W_READY, null, this.serviceBytes, new byte[][]{this.getUniqueID().getBytes(StandardCharsets.UTF_8)});
        if (this.poller != null) {
            this.poller.unregister(this.workerSocket);
            this.poller.close();
        }
        this.poller = this.ctx.createPoller(1);
        this.poller.register(this.workerSocket, 1);
        this.liveness = HEARTBEAT_LIVENESS;
        this.heartbeatAt = System.currentTimeMillis() + (long)HEARTBEAT_INTERVAL;
    }

    public static interface RequestHandler {
        public byte[][] handle(byte[][] var1) throws Throwable;
    }
}

