/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.concepts.majordomo;

import io.opencmw.concepts.majordomo.MajordomoProtocol;
import io.opencmw.concepts.majordomo.MajordomoWorker;
import io.opencmw.rbac.BasicRbacRole;
import io.opencmw.rbac.RbacRole;
import io.opencmw.rbac.RbacToken;
import io.opencmw.utils.SystemProperties;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;

public class MajordomoBroker
extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(MajordomoBroker.class);
    private static final byte[] INTERNAL_SENDER_ID = null;
    private static final String INTERNAL_SERVICE_PREFIX = "mmi.";
    private static final byte[] INTERNAL_SERVICE_PREFIX_BYTES = "mmi.".getBytes(StandardCharsets.UTF_8);
    private static final long HEARTBEAT_LIVENESS = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeatLiveness", (int)3);
    private static final long HEARTBEAT_INTERVAL = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeat", (int)2500);
    private static final long HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
    private static final int CLIENT_TIMEOUT = SystemProperties.getValueIgnoreCase((String)"OpenCMW.clientTimeOut", (int)0);
    private static final AtomicInteger BROKER_COUNTER = new AtomicInteger();
    private static final AtomicInteger WORKER_COUNTER = new AtomicInteger();
    private final ZContext ctx;
    private final ZMQ.Socket internalRouterSocket;
    private final ZMQ.Socket internalServiceSocket;
    private final List<ZMQ.Socket> routerSockets = new ArrayList<ZMQ.Socket>();
    private final AtomicBoolean run = new AtomicBoolean(false);
    private final SortedSet<RbacRole<?>> rbacRoles;
    private final Map<String, Service> services = new HashMap<String, Service>();
    private final Map<String, Worker> workers = new HashMap<String, Worker>();
    private final Map<String, Client> clients = new HashMap<String, Client>();
    private final Deque<Worker> waiting = new ArrayDeque<Worker>();
    private long heartbeatAt = System.currentTimeMillis() + HEARTBEAT_INTERVAL;

    public MajordomoBroker(int ioThreads, RbacRole<?> ... rbacRoles) {
        this.setName(MajordomoBroker.class.getSimpleName() + "#" + BROKER_COUNTER.getAndIncrement());
        this.ctx = new ZContext(ioThreads);
        this.rbacRoles = Collections.unmodifiableSortedSet(new TreeSet(Set.of(rbacRoles)));
        this.internalRouterSocket = this.bind("inproc://broker");
        this.internalServiceSocket = this.bind("inproc://intService");
        this.registerDefaultServices(rbacRoles);
    }

    public void addInternalService(MajordomoWorker worker, int nServiceThreads) {
        assert (worker != null) : "worker must not be null";
        Service oldWorker = this.services.put(worker.getServiceName(), new Service(worker.getServiceName(), worker.getServiceName().getBytes(StandardCharsets.UTF_8), worker, nServiceThreads));
        if (oldWorker != null) {
            LOGGER.atWarn().addArgument((Object)worker.getServiceName()).log("overwriting existing internal service definition for '{}'");
        }
    }

    public ZMQ.Socket bind(String endpoint) {
        ZMQ.Socket routerSocket = this.ctx.createSocket(SocketType.ROUTER);
        routerSocket.setHWM(0);
        routerSocket.bind(endpoint);
        this.routerSockets.add(routerSocket);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.atDebug().addArgument((Object)endpoint).log("Majordomo broker/0.1 is active at '{}'");
        }
        return routerSocket;
    }

    public ZContext getContext() {
        return this.ctx;
    }

    public ZMQ.Socket getInternalRouterSocket() {
        return this.internalRouterSocket;
    }

    public List<ZMQ.Socket> getRouterSockets() {
        return Collections.unmodifiableList(this.routerSockets);
    }

    public Collection<Service> getServices() {
        return this.services.values();
    }

    public boolean isRunning() {
        return this.run.get();
    }

    public void removeService(String serviceName) {
        this.services.remove(serviceName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try (ZMQ.Poller items = this.ctx.createPoller(this.routerSockets.size());){
            for (ZMQ.Socket routerSocket : this.routerSockets) {
                items.register(routerSocket, 1);
            }
            block9: while (this.run.get() && !Thread.currentThread().isInterrupted()) {
                if (items.poll(HEARTBEAT_INTERVAL) == -1) {
                    break;
                }
                int loopCount = 0;
                while (this.run.get()) {
                    boolean processData = false;
                    for (ZMQ.Socket routerSocket : this.routerSockets) {
                        MajordomoProtocol.MdpMessage msg = MajordomoProtocol.receiveMdpMessage(routerSocket, false);
                        if (msg == null) continue;
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.atDebug().addArgument((Object)msg).log("Majordomo broker received new message: '{}'");
                        }
                        processData |= this.handleReceivedMessage(routerSocket, msg);
                    }
                    this.processClients();
                    if (loopCount % 10 == 0) {
                        this.purgeWorkers();
                        this.purgeClients();
                        this.sendHeartbeats();
                    }
                    ++loopCount;
                    if (processData) continue;
                    continue block9;
                }
            }
        }
        finally {
            this.routerSockets.forEach(ZMQ.Socket::close);
        }
        this.destroy();
    }

    @Override
    public synchronized void start() {
        this.run.set(true);
        this.services.forEach((serviceName, service) -> service.internalWorkers.forEach(Thread::start));
        super.start();
    }

    public void stopBroker() {
        this.run.set(false);
    }

    protected void deleteWorker(Worker worker, boolean disconnect) {
        assert (worker != null);
        if (disconnect) {
            MajordomoProtocol.sendWorkerMessage(worker.socket, MajordomoProtocol.MdpWorkerCommand.W_DISCONNECT, worker.address, null, new byte[0][]);
        }
        if (worker.service != null) {
            worker.service.waiting.remove(worker);
        }
        this.workers.remove(worker.addressHex);
    }

    protected void destroy() {
        Worker[] deleteList;
        for (Worker worker : deleteList = this.workers.values().toArray(new Worker[0])) {
            this.deleteWorker(worker, true);
        }
        this.ctx.destroy();
    }

    protected void dispatch(Service service) {
        assert (service != null);
        this.purgeWorkers();
        while (!service.waiting.isEmpty() && service.requestsPending()) {
            MajordomoProtocol.MdpClientMessage msg = service.getNextPrioritisedMessage();
            if (msg == null) {
                assert (false) : "getNextPrioritisedMessage should not be null";
                continue;
            }
            Worker worker = service.waiting.pop();
            this.waiting.remove(worker);
            MajordomoProtocol.sendWorkerMessage(worker.socket, MajordomoProtocol.MdpWorkerCommand.W_REQUEST, worker.address, msg.senderID, msg.payload);
        }
    }

    protected boolean handleReceivedMessage(ZMQ.Socket receiveSocket, MajordomoProtocol.MdpMessage msg) {
        switch (msg.protocol) {
            case C_CLIENT: {
                Client client = this.clients.computeIfAbsent(msg.senderName, s -> new Client(receiveSocket, msg.protocol, msg.senderName, msg.senderID));
                client.offerToQueue((MajordomoProtocol.MdpClientMessage)msg);
                return true;
            }
            case W_WORKER: {
                this.processWorker(receiveSocket, (MajordomoProtocol.MdpWorkerMessage)msg);
                return true;
            }
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.atDebug().addArgument((Object)msg).log("Majordomo broker invalid message: '{}'");
        }
        return false;
    }

    protected void processClients() {
        this.clients.forEach((name, client) -> {
            MajordomoProtocol.MdpClientMessage clientMessage = client.pop();
            if (clientMessage == null) {
                return;
            }
            Service service = this.services.get(clientMessage.serviceName);
            if (service == null) {
                MajordomoProtocol.sendClientMessage(client.socket, MajordomoProtocol.MdpClientCommand.C_UNKNOWN, clientMessage.senderID, clientMessage.serviceNameBytes, new byte[][]{"501".getBytes(StandardCharsets.UTF_8)});
                return;
            }
            service.putPrioritisedMessage(clientMessage);
            if (service.isInternal) {
                MajordomoProtocol.MdpClientMessage msg = service.getNextPrioritisedMessage();
                if (msg == null) {
                    assert (false) : "getNextPrioritisedMessage should not be null";
                    return;
                }
                MajordomoProtocol.sendWorkerMessage(service.internalDispatchSocket, MajordomoProtocol.MdpWorkerCommand.W_REQUEST, null, msg.senderID, msg.payload);
            } else {
                this.dispatch(service);
            }
        });
    }

    protected void processWorker(ZMQ.Socket receiveSocket, MajordomoProtocol.MdpWorkerMessage msg) {
        boolean isInternal = this.internalServiceSocket.equals(receiveSocket);
        boolean workerReady = isInternal || this.workers.containsKey(msg.senderIdHex);
        switch (msg.command) {
            case W_READY: {
                Worker worker = this.requireWorker(receiveSocket, msg.senderID, msg.senderIdHex);
                if (workerReady || Arrays.equals(INTERNAL_SERVICE_PREFIX_BYTES, 0, 3, msg.senderID, 0, 3)) {
                    this.deleteWorker(worker, true);
                    break;
                }
                worker.service = this.requireService(msg.serviceName, msg.serviceNameBytes);
                this.workerWaiting(worker);
                break;
            }
            case W_REPLY: {
                if (workerReady) {
                    Worker worker = isInternal ? null : this.requireWorker(receiveSocket, msg.senderID, msg.senderIdHex);
                    byte[] serviceName = isInternal ? msg.senderID : worker.service.nameBytes;
                    Client client = this.clients.get(msg.clientSourceName);
                    if (client == null || client.socket == null) break;
                    if (client.protocol != MajordomoProtocol.MdpSubProtocol.C_CLIENT) {
                        throw new IllegalStateException("Unexpected value: " + client.protocol);
                    }
                    MajordomoProtocol.sendClientMessage(client.socket, MajordomoProtocol.MdpClientCommand.C_UNKNOWN, msg.clientSourceID, serviceName, msg.payload);
                    if (isInternal) break;
                    this.workerWaiting(worker);
                    break;
                }
                Worker worker = this.requireWorker(receiveSocket, msg.senderID, msg.senderIdHex);
                this.deleteWorker(worker, true);
                break;
            }
            case W_HEARTBEAT: {
                Worker worker = this.requireWorker(receiveSocket, msg.senderID, msg.senderIdHex);
                if (workerReady) {
                    worker.expiry = System.currentTimeMillis() + HEARTBEAT_EXPIRY;
                    break;
                }
                this.deleteWorker(worker, true);
                break;
            }
            case W_DISCONNECT: {
                Worker worker = this.requireWorker(receiveSocket, msg.senderID, msg.senderIdHex);
                this.deleteWorker(worker, false);
                break;
            }
            default: {
                if (!LOGGER.isDebugEnabled()) break;
                LOGGER.atDebug().addArgument((Object)msg).log("Majordomo broker invalid message: '{}'");
            }
        }
    }

    protected void purgeClients() {
        if (CLIENT_TIMEOUT <= 0) {
            return;
        }
        for (String clientName : this.clients.keySet()) {
            Client client = this.clients.get(clientName);
            if (client != null && client.expiry >= System.currentTimeMillis()) continue;
            this.clients.remove(clientName);
            if (!LOGGER.isDebugEnabled()) continue;
            LOGGER.atDebug().addArgument((Object)client).log("Majordomo broker deleting expired client: '{}'");
        }
    }

    protected void purgeWorkers() {
        Worker w = this.waiting.peekFirst();
        while (w != null && w.expiry < System.currentTimeMillis()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.atInfo().addArgument((Object)w.addressHex).addArgument((Object)(w.service == null ? "(unknown)" : w.service.name)).log("Majordomo broker deleting expired worker: '{}' - service: '{}'");
            }
            this.deleteWorker(this.waiting.pollFirst(), false);
            w = this.waiting.peekFirst();
        }
    }

    protected void registerDefaultServices(RbacRole<?>[] rbacRoles) {
        MajordomoWorker mmiService = new MajordomoWorker(this.ctx, "mmi.service", rbacRoles);
        mmiService.registerHandler(payload -> {
            String serviceName = new String(payload[0], StandardCharsets.UTF_8);
            String returnCode = this.services.containsKey(serviceName) ? "200" : "400";
            return new byte[][]{returnCode.getBytes(StandardCharsets.UTF_8)};
        });
        this.addInternalService(mmiService, 1);
        MajordomoWorker echoService = new MajordomoWorker(this.ctx, "mmi.echo", rbacRoles);
        echoService.registerHandler(input -> input);
        this.addInternalService(echoService, 1);
    }

    protected Service requireService(String serviceName, byte[] serviceNameBytes) {
        assert (serviceNameBytes != null);
        return this.services.computeIfAbsent(serviceName, s -> new Service(serviceName, serviceNameBytes, null, 0));
    }

    protected Worker requireWorker(ZMQ.Socket socket, byte[] address, String addressHex) {
        assert (addressHex != null);
        return this.workers.computeIfAbsent(addressHex, identity -> {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.atInfo().addArgument((Object)addressHex).log("Majordomo broker registering new worker: '{}'");
            }
            return new Worker(socket, address, addressHex);
        });
    }

    protected void sendHeartbeats() {
        if (System.currentTimeMillis() >= this.heartbeatAt) {
            for (Worker worker : this.waiting) {
                MajordomoProtocol.sendWorkerMessage(worker.socket, MajordomoProtocol.MdpWorkerCommand.W_HEARTBEAT, worker.address, null, new byte[0][]);
            }
            this.heartbeatAt = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
        }
    }

    protected void workerWaiting(Worker worker) {
        this.waiting.addLast(worker);
        worker.service.waiting.push(worker);
        worker.expiry = System.currentTimeMillis() + HEARTBEAT_EXPIRY;
        this.dispatch(worker.service);
    }

    public static void main(String[] args) {
        MajordomoBroker broker = new MajordomoBroker(1, (RbacRole<?>[])BasicRbacRole.values());
        broker.bind("tcp://*:5555");
        broker.bind("tcp://*:5556");
        for (int i = 0; i < 10; ++i) {
            MajordomoWorker workerSession = new MajordomoWorker(broker.getContext(), "inproc.echo", new RbacRole[]{BasicRbacRole.ADMIN});
            workerSession.registerHandler(input -> input);
            workerSession.start();
        }
        broker.start();
    }

    protected class InternalWorkerThread
    extends Thread {
        private final Service service;

        public InternalWorkerThread(Service service) {
            assert (service != null && service.name != null && !service.name.isBlank());
            String serviceName = service.name + "#" + WORKER_COUNTER.getAndIncrement();
            this.setName(MajordomoBroker.class.getSimpleName() + "-" + InternalWorkerThread.class.getSimpleName() + ":" + serviceName);
            this.setDaemon(true);
            this.service = service;
        }

        @Override
        public void run() {
            try (ZMQ.Socket sendSocket = MajordomoBroker.this.ctx.createSocket(SocketType.DEALER);
                 ZMQ.Socket receiveSocket = MajordomoBroker.this.ctx.createSocket(SocketType.PULL);
                 ZMQ.Poller items = MajordomoBroker.this.ctx.createPoller(1);){
                sendSocket.setSndHWM(0);
                sendSocket.setIdentity(this.service.name.getBytes(StandardCharsets.UTF_8));
                sendSocket.connect("inproc://intService");
                receiveSocket.connect("inproc://" + this.service.mdpWorker.getServiceName() + "push");
                receiveSocket.setRcvHWM(0);
                items.register(receiveSocket, 1);
                while (MajordomoBroker.this.run.get() && !this.isInterrupted()) {
                    MajordomoProtocol.MdpMessage mdpMessage;
                    if (items.poll(HEARTBEAT_INTERVAL) == -1) {
                        break;
                    }
                    while (MajordomoBroker.this.run.get() && (mdpMessage = MajordomoProtocol.receiveMdpMessage(receiveSocket, false)) != null && !mdpMessage.isClient) {
                        MajordomoProtocol.MdpWorkerMessage msg = (MajordomoProtocol.MdpWorkerMessage)mdpMessage;
                        MajordomoProtocol.MdpMessage reply = this.service.mdpWorker.processRequest(msg, msg.clientSourceID);
                        if (reply == null) continue;
                        MajordomoProtocol.sendWorkerMessage(sendSocket, MajordomoProtocol.MdpWorkerCommand.W_REPLY, INTERNAL_SENDER_ID, msg.clientSourceID, reply.payload);
                    }
                }
            }
            catch (ZMQException zMQException) {
                // empty catch block
            }
        }
    }

    protected class Worker {
        protected final ZMQ.Socket socket;
        protected final byte[] address;
        protected final String addressHex;
        protected final boolean external;
        protected Service service;
        protected long expiry = System.currentTimeMillis() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;

        public Worker(ZMQ.Socket socket, byte[] address, String addressHex) {
            this.socket = socket;
            this.external = true;
            this.address = address;
            this.addressHex = addressHex;
        }
    }

    protected class Service {
        protected final String name;
        protected final byte[] nameBytes;
        protected final MajordomoWorker mdpWorker;
        protected final boolean isInternal;
        protected final Map<RbacRole<?>, Queue<MajordomoProtocol.MdpClientMessage>> requests = new HashMap();
        protected final Deque<Worker> waiting = new ArrayDeque<Worker>();
        protected final List<Thread> internalWorkers = new ArrayList<Thread>();
        protected final ZMQ.Socket internalDispatchSocket;

        public Service(String name, byte[] nameBytes, MajordomoWorker mdpWorker, int nInternalThreads) {
            this.name = name;
            this.nameBytes = nameBytes == null ? name.getBytes(StandardCharsets.UTF_8) : nameBytes;
            this.mdpWorker = mdpWorker;
            boolean bl = this.isInternal = mdpWorker != null;
            if (this.isInternal) {
                this.internalDispatchSocket = MajordomoBroker.this.ctx.createSocket(SocketType.PUSH);
                this.internalDispatchSocket.setHWM(0);
                this.internalDispatchSocket.bind("inproc://" + mdpWorker.getServiceName() + "push");
                for (int i = 0; i < nInternalThreads; ++i) {
                    this.internalWorkers.add(new InternalWorkerThread(this));
                }
            } else {
                this.internalDispatchSocket = null;
            }
            MajordomoBroker.this.rbacRoles.forEach(role -> this.requests.put((RbacRole<?>)role, new ArrayDeque()));
            this.requests.put((RbacRole<?>)BasicRbacRole.NULL, new ArrayDeque());
        }

        public boolean requestsPending() {
            return this.requests.entrySet().stream().anyMatch(map -> !((Queue)map.getValue()).isEmpty());
        }

        protected final MajordomoProtocol.MdpClientMessage getNextPrioritisedMessage() {
            for (RbacRole rbacRole : MajordomoBroker.this.rbacRoles) {
                Queue<MajordomoProtocol.MdpClientMessage> queue = this.requests.get(rbacRole);
                if (queue.isEmpty()) continue;
                return queue.poll();
            }
            Queue<MajordomoProtocol.MdpClientMessage> queue = this.requests.get(BasicRbacRole.NULL);
            return queue.isEmpty() ? null : queue.poll();
        }

        protected void putPrioritisedMessage(MajordomoProtocol.MdpClientMessage queuedMessage) {
            if (queuedMessage.hasRbackToken()) {
                RbacToken rbacToken = RbacToken.from((byte[])queuedMessage.getRbacFrame());
                Queue<MajordomoProtocol.MdpClientMessage> roleBasedQueue = this.requests.get(rbacToken.getRole());
                if (roleBasedQueue != null) {
                    roleBasedQueue.offer(queuedMessage);
                }
            } else {
                this.requests.get(BasicRbacRole.NULL).offer(queuedMessage);
            }
        }
    }

    protected static class Client {
        protected final ZMQ.Socket socket;
        protected final MajordomoProtocol.MdpSubProtocol protocol;
        protected final String name;
        protected final byte[] nameBytes;
        protected final String nameHex;
        private final Deque<MajordomoProtocol.MdpClientMessage> requests = new ArrayDeque<MajordomoProtocol.MdpClientMessage>();
        protected long expiry = System.currentTimeMillis() + (long)CLIENT_TIMEOUT * 1000L;

        public Client(ZMQ.Socket socket, MajordomoProtocol.MdpSubProtocol protocol, String name, byte[] nameBytes) {
            this.socket = socket;
            this.protocol = protocol;
            this.name = name;
            this.nameBytes = nameBytes == null ? name.getBytes(StandardCharsets.UTF_8) : nameBytes;
            this.nameHex = MajordomoProtocol.strhex(nameBytes);
        }

        public void offerToQueue(MajordomoProtocol.MdpClientMessage msg) {
            this.expiry = System.currentTimeMillis() + (long)CLIENT_TIMEOUT * 1000L;
            this.requests.offer(msg);
        }

        public MajordomoProtocol.MdpClientMessage pop() {
            return this.requests.isEmpty() ? null : this.requests.poll();
        }
    }
}

