/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.server;

import io.opencmw.OpenCmwConstants;
import io.opencmw.OpenCmwProtocol;
import io.opencmw.filter.SubscriptionMatcher;
import io.opencmw.rbac.RbacRole;
import io.opencmw.serialiser.annotations.MetaInfo;
import io.opencmw.serialiser.utils.ClassUtils;
import io.opencmw.utils.SystemProperties;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiPredicate;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

@MetaInfo(description="default BasicMdpWorker implementation")
public class BasicMdpWorker
extends Thread
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(BasicMdpWorker.class);
    protected static final byte[] RBAC = new byte[0];
    protected static final AtomicInteger WORKER_COUNTER = new AtomicInteger();
    protected final BiPredicate<URI, URI> subscriptionMatcher = new SubscriptionMatcher(new Class[0]);
    protected final String uniqueID;
    protected final ZContext ctx;
    protected final boolean ownsContext;
    protected final URI brokerAddress;
    protected final String serviceName;
    protected final byte[] serviceBytes;
    protected final AtomicBoolean shallRun = new AtomicBoolean(false);
    protected final AtomicBoolean running = new AtomicBoolean(false);
    protected final SortedSet<RbacRole> rbacRoles;
    protected final ZMQ.Socket notifySocket;
    protected final ZMQ.Socket notifyListenerSocket;
    protected final List<URI> activeSubscriptions = Collections.synchronizedList(new ArrayList());
    protected ZMQ.Socket workerSocket;
    protected ZMQ.Socket pubSocket;
    protected final int heartBeatLiveness = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeatLiveness", (int)3);
    protected final long heartBeatInterval = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeat", (long)1000L);
    protected long heartbeatAt;
    protected int liveness;
    protected long reconnect = 2500L;
    protected RequestHandler requestHandler;
    protected ZMQ.Poller poller;

    public BasicMdpWorker(URI brokerAddress, String serviceName, RbacRole<?> ... rbacRoles) {
        this(null, brokerAddress, serviceName, rbacRoles);
    }

    public BasicMdpWorker(ZContext ctx, String serviceName, RbacRole<?> ... rbacRoles) {
        this(ctx, URI.create("inproc://broker"), serviceName, rbacRoles);
    }

    protected BasicMdpWorker(ZContext ctx, URI brokerAddress, String serviceName, RbacRole<?> ... rbacRoles) {
        assert (brokerAddress != null);
        assert (serviceName != null);
        this.brokerAddress = OpenCmwConstants.stripPathTrailingSlash((URI)brokerAddress);
        this.serviceName = StringUtils.stripStart((String)serviceName, (String)"/");
        this.serviceBytes = this.serviceName.getBytes(StandardCharsets.UTF_8);
        this.ownsContext = ctx == null;
        this.rbacRoles = Collections.unmodifiableSortedSet(new TreeSet(Set.of(rbacRoles)));
        this.ctx = Objects.requireNonNullElseGet(ctx, ZContext::new);
        if (!this.ownsContext) {
            this.setDaemon(true);
        }
        this.setName(BasicMdpWorker.class.getSimpleName() + "#" + WORKER_COUNTER.getAndIncrement());
        this.uniqueID = this.serviceName + "-PID=" + ManagementFactory.getRuntimeMXBean().getName() + "-TID=" + this.getId();
        this.setName(this.getClass().getSimpleName() + "-" + this.uniqueID);
        this.notifyListenerSocket = this.ctx.createSocket(SocketType.PAIR);
        this.notifyListenerSocket.bind("inproc://notifyListener" + this.uniqueID);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.notifyListenerSocket);
        this.notifySocket = this.ctx.createSocket(SocketType.PAIR);
        this.notifySocket.connect("inproc://notifyListener" + this.uniqueID);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.notifySocket);
        LOGGER.atTrace().addArgument((Object)serviceName).addArgument((Object)this.uniqueID).log("created new service '{}' worker - uniqueID: {}");
    }

    public SortedSet<RbacRole> getRbacRoles() {
        return this.rbacRoles;
    }

    public Duration getReconnectDelay() {
        return Duration.ofMillis(this.reconnect);
    }

    public RequestHandler getRequestHandler() {
        return this.requestHandler;
    }

    public String getServiceName() {
        return this.serviceName;
    }

    public String getUniqueID() {
        return this.uniqueID;
    }

    public boolean notify(@NotNull OpenCmwProtocol.MdpMessage notifyMessage) {
        URI originalTopic = notifyMessage.topic;
        ArrayList<URI> subTopics = new ArrayList<URI>(this.activeSubscriptions);
        if (subTopics.stream().filter(s -> this.subscriptionMatcher.test(originalTopic, (URI)s)).findFirst().isEmpty()) {
            return false;
        }
        notifyMessage.senderID = OpenCmwProtocol.EMPTY_FRAME;
        notifyMessage.protocol = OpenCmwProtocol.MdpSubProtocol.PROT_WORKER;
        notifyMessage.command = OpenCmwProtocol.Command.W_NOTIFY;
        notifyMessage.serviceNameBytes = OpenCmwProtocol.EMPTY_FRAME;
        notifyMessage.clientRequestID = OpenCmwProtocol.EMPTY_FRAME;
        notifyMessage.topic = originalTopic;
        return this.notifyRaw(notifyMessage);
    }

    public BasicMdpWorker registerHandler(RequestHandler requestHandler) {
        this.requestHandler = requestHandler;
        return this;
    }

    @Override
    public void run() {
        this.shallRun.set(true);
        this.reconnectToBroker();
        OpenCmwProtocol.MdpMessage heartbeatMsg = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_WORKER, OpenCmwProtocol.Command.W_HEARTBEAT, this.serviceBytes, OpenCmwProtocol.EMPTY_FRAME, OpenCmwProtocol.EMPTY_URI, OpenCmwProtocol.EMPTY_FRAME, "", RBAC);
        this.running.set(true);
        while (this.shallRun.get() && !Thread.currentThread().isInterrupted() && this.poller.poll(this.heartBeatInterval) != -1) {
            boolean dataReceived = true;
            while (dataReceived) {
                OpenCmwProtocol.MdpMessage brokerMsg = OpenCmwProtocol.MdpMessage.receive((ZMQ.Socket)this.workerSocket, (boolean)false);
                dataReceived = OpenCmwProtocol.MdpMessage.send((ZMQ.Socket)this.workerSocket, this.handleRequestsFromBroker(brokerMsg));
                ZMsg pubMsg = ZMsg.recvMsg((ZMQ.Socket)this.pubSocket, (boolean)false);
                dataReceived |= this.handleSubscriptionMsg(pubMsg);
                OpenCmwProtocol.MdpMessage notifyMsg = OpenCmwProtocol.MdpMessage.receive((ZMQ.Socket)this.notifyListenerSocket, (boolean)false);
                if (notifyMsg == null) continue;
                dataReceived |= notifyMsg.send(this.workerSocket);
            }
            if (System.currentTimeMillis() > this.heartbeatAt && --this.liveness == 0) {
                LOGGER.atWarn().addArgument((Object)this.uniqueID).log("worker '{}' disconnected from broker - retrying");
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(this.reconnect));
                this.reconnectToBroker();
            }
            if (System.currentTimeMillis() <= this.heartbeatAt || this.ctx.isClosed()) continue;
            heartbeatMsg.send(this.workerSocket);
            this.heartbeatAt = System.currentTimeMillis() + this.heartBeatInterval;
        }
        this.running.set(false);
        this.shallRun.set(false);
        LOGGER.atInfo().addArgument((Object)this.getName()).log("'{}' shut-down");
    }

    public void setReconnectDelay(int reconnect, @NotNull TimeUnit timeUnit) {
        this.reconnect = timeUnit.toMillis(reconnect);
    }

    public void stopWorker() {
        if (!this.running.get() || !this.shallRun.get()) {
            return;
        }
        this.shallRun.set(false);
        try {
            this.join(this.heartBeatInterval);
        }
        catch (InterruptedException e) {
            throw new IllegalStateException(this.getName() + " did not shut down in " + this.heartBeatInterval + " ms", e);
        }
        this.close();
    }

    @Override
    public void close() {
        if (this.running.get()) {
            LOGGER.atWarn().addArgument((Object)this.serviceName).log("trying to shut-down service '{}' while not fully finished");
            try {
                this.join(this.heartBeatInterval);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(this.getName() + " did not shut down in " + this.heartBeatInterval + " ms", e);
            }
        }
        this.pubSocket.close();
        this.workerSocket.close();
        this.notifyListenerSocket.close();
        this.notifySocket.close();
        if (this.ownsContext) {
            this.ctx.close();
        }
    }

    protected List<OpenCmwProtocol.MdpMessage> handleRequestsFromBroker(OpenCmwProtocol.MdpMessage request) {
        if (request == null) {
            return Collections.emptyList();
        }
        this.liveness = this.heartBeatLiveness;
        switch (request.command) {
            case GET_REQUEST: 
            case SET_REQUEST: 
            case W_NOTIFY: 
            case PARTIAL: 
            case FINAL: {
                return this.processRequest(request);
            }
            case W_HEARTBEAT: {
                return Collections.emptyList();
            }
            case DISCONNECT: {
                if (Arrays.equals(OpenCmwProtocol.BROKER_SHUTDOWN, request.data)) {
                    this.shallRun.set(false);
                    LOGGER.atInfo().addArgument((Object)this.serviceName).log("broker requested to shut-down '{}'");
                    return Collections.emptyList();
                }
                this.reconnectToBroker();
                return Collections.emptyList();
            }
            case READY: 
            case SUBSCRIBE: 
            case UNSUBSCRIBE: 
            case UNKNOWN: {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.atWarn().addArgument((Object)this.getServiceName()).addArgument((Object)request.command).log("service '{}' erroneously received {} command - should be handled in Majordomo broker");
                }
                return Collections.emptyList();
            }
        }
        throw new IllegalStateException("should not reach here - request message = " + request);
    }

    protected boolean handleSubscriptionMsg(ZMsg subMsg) {
        if (subMsg == null || subMsg.isEmpty()) {
            return false;
        }
        byte[] topicBytes = subMsg.getFirst().getData();
        if (topicBytes.length == 0) {
            return false;
        }
        URI subscriptionTopic = URI.create(new String(topicBytes, 1, topicBytes.length - 1, StandardCharsets.UTF_8));
        if (LOGGER.isDebugEnabled() && (subscriptionTopic.toString().isBlank() || subscriptionTopic.toString().contains(this.getServiceName()))) {
            LOGGER.atDebug().addArgument((Object)this.getServiceName()).addArgument((Object)topicBytes[0]).addArgument((Object)subscriptionTopic).log("Service '{}' received subscription request: {} to '{}'");
        }
        if (!subscriptionTopic.toString().isBlank() && !subscriptionTopic.getPath().startsWith(this.getServiceName())) {
            return false;
        }
        switch (topicBytes[0]) {
            case 1: {
                this.activeSubscriptions.add(subscriptionTopic);
                return true;
            }
            case 0: {
                this.activeSubscriptions.remove(subscriptionTopic);
                return true;
            }
        }
        throw new IllegalStateException("recovered invalid subscription ID " + subMsg);
    }

    protected boolean notifyRaw(@NotNull OpenCmwProtocol.MdpMessage notifyMessage) {
        if (this.running.get()) {
            return notifyMessage.send(this.notifySocket);
        }
        LOGGER.atDebug().addArgument((Object)this.serviceName).log("Service '{}' is not running");
        return false;
    }

    protected List<OpenCmwProtocol.MdpMessage> processRequest(OpenCmwProtocol.MdpMessage request) {
        try {
            OpenCmwProtocol.Context mdpCtx = new OpenCmwProtocol.Context(request);
            this.getRequestHandler().handle(mdpCtx);
            return mdpCtx.rep == null ? Collections.emptyList() : List.of(mdpCtx.rep);
        }
        catch (Throwable e) {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            e.printStackTrace(pw);
            String exceptionMsg = "\u001b[31m" + this.getClass().getName() + " caught exception for service '" + this.getServiceName() + "'\nrequest msg: " + request + "\nexception: " + sw.toString() + "\u001b[0m";
            if (LOGGER.isDebugEnabled()) {
                LOGGER.atError().addArgument((Object)exceptionMsg).log("could not processRequest(MdpMessage) - exception thrown:\n{}");
            }
            return List.of(new OpenCmwProtocol.MdpMessage(request.senderID, request.protocol, OpenCmwProtocol.Command.FINAL, request.serviceNameBytes, request.clientRequestID, request.topic, null, exceptionMsg, RBAC));
        }
    }

    protected void reconnectToBroker() {
        if (this.ctx == null || this.ctx.isClosed()) {
            return;
        }
        if (this.workerSocket != null) {
            this.workerSocket.close();
        }
        URI translatedBrokerAddress = OpenCmwConstants.replaceScheme((URI)this.brokerAddress, (String)"tcp");
        this.workerSocket = this.ctx.createSocket(SocketType.DEALER);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.workerSocket);
        this.workerSocket.connect(translatedBrokerAddress + "/router");
        if (this.pubSocket != null) {
            this.pubSocket.close();
        }
        this.pubSocket = this.ctx.createSocket(SocketType.XPUB);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.pubSocket);
        this.pubSocket.setXpubVerbose(true);
        this.pubSocket.connect(translatedBrokerAddress + "/subscribe");
        LOGGER.atInfo().addArgument((Object)this.brokerAddress).log("register service with broker '{}");
        byte[] classNameByte = this.getClass().getName().getBytes(StandardCharsets.UTF_8);
        new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_WORKER, OpenCmwProtocol.Command.READY, this.serviceBytes, OpenCmwProtocol.EMPTY_FRAME, URI.create(this.serviceName), classNameByte, "", RBAC).send(this.workerSocket);
        if (this.poller != null) {
            this.poller.unregister(this.workerSocket);
            this.poller.close();
        }
        this.poller = this.ctx.createPoller(3);
        this.poller.register(this.workerSocket, 1);
        this.poller.register(this.pubSocket, 1);
        this.poller.register(this.notifyListenerSocket, 1);
        this.liveness = this.heartBeatLiveness;
        this.heartbeatAt = System.currentTimeMillis() + this.heartBeatInterval;
    }

    static {
        String reason = "recursive definitions inside ZeroMQ";
        ClassUtils.DO_NOT_PARSE_MAP.put(ZContext.class, "recursive definitions inside ZeroMQ");
        ClassUtils.DO_NOT_PARSE_MAP.put(ZMQ.Socket.class, "recursive definitions inside ZeroMQ");
        ClassUtils.DO_NOT_PARSE_MAP.put(ZMQ.Poller.class, "recursive definitions inside ZeroMQ");
    }

    public static interface RequestHandler {
        public void handle(OpenCmwProtocol.Context var1) throws Throwable;
    }
}

