/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.server;

import io.opencmw.OpenCmwConstants;
import io.opencmw.OpenCmwProtocol;
import io.opencmw.filter.SubscriptionMatcher;
import io.opencmw.rbac.BasicRbacRole;
import io.opencmw.rbac.RbacRole;
import io.opencmw.rbac.RbacToken;
import io.opencmw.server.BasicMdpWorker;
import io.opencmw.server.MmiServiceHelper;
import io.opencmw.utils.NoDuplicatesList;
import io.opencmw.utils.SystemProperties;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.util.ZData;

public class MajordomoBroker
extends Thread
implements AutoCloseable {
    public static final byte[] RBAC = new byte[0];
    private static final Logger LOGGER = LoggerFactory.getLogger(MajordomoBroker.class);
    public static final String SUFFIX_ROUTER = "/router";
    public static final String SUFFIX_PUBLISHER = "/publisher";
    public static final String SUFFIX_SUBSCRIBE = "/subscribe";
    public static final String INPROC_BROKER = "inproc://broker";
    public static final String INTERNAL_ADDRESS_BROKER = "inproc://broker/router";
    public static final String INTERNAL_ADDRESS_PUBLISHER = "inproc://broker/publisher";
    public static final String INTERNAL_ADDRESS_SUBSCRIBE = "inproc://broker/subscribe";
    private static final AtomicInteger BROKER_COUNTER = new AtomicInteger();
    protected final ZContext ctx;
    protected final ZMQ.Socket routerSocket;
    protected final ZMQ.Socket pubSocket;
    protected final ZMQ.Socket subSocket;
    protected final ZMQ.Socket dnsSocket;
    protected final String brokerName;
    protected final URI dnsAddress;
    protected final List<String> routerSockets = new NoDuplicatesList();
    protected final SortedSet<RbacRole<?>> rbacRoles;
    final Map<String, Service> services = new HashMap<String, Service>();
    protected final Map<String, Worker> workers = new HashMap<String, Worker>();
    protected final Map<String, Client> clients = new HashMap<String, Client>();
    protected final BiPredicate<URI, URI> subscriptionMatcher = new SubscriptionMatcher(new Class[0]);
    protected final Map<URI, AtomicInteger> activeSubscriptions = new HashMap<URI, AtomicInteger>();
    protected final Map<URI, List<byte[]>> routerBasedSubscriptions = new HashMap<URI, List<byte[]>>();
    private final AtomicBoolean run = new AtomicBoolean(false);
    private final AtomicBoolean running = new AtomicBoolean(false);
    protected final Deque<Worker> waiting = new ArrayDeque<Worker>();
    protected final int heartBeatLiveness = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeatLiveness", (int)3);
    protected final long heartBeatInterval = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeat", (long)1000L);
    protected final long heartBeatExpiry = this.heartBeatInterval * (long)this.heartBeatLiveness;
    private final Map<String, DnsServiceItem> dnsCache = new ConcurrentHashMap<String, DnsServiceItem>();
    protected final long clientTimeOut = TimeUnit.SECONDS.toMillis(SystemProperties.getValueIgnoreCase((String)"OpenCMW.clientTimeOut", (long)0L));
    protected final long dnsTimeOut = TimeUnit.SECONDS.toMillis(SystemProperties.getValueIgnoreCase((String)"OpenCMW.dnsTimeOut", (int)10));
    protected long heartbeatAt = System.currentTimeMillis() + this.heartBeatInterval;
    protected long dnsHeartbeatAt = System.currentTimeMillis() + this.dnsTimeOut;

    public MajordomoBroker(@NotNull String brokerName, URI dnsAddress, RbacRole<?> ... rbacRoles) {
        this.brokerName = brokerName;
        this.dnsAddress = dnsAddress == null || dnsAddress.toString().isBlank() ? null : OpenCmwConstants.replaceScheme((URI)dnsAddress, (String)"tcp");
        this.setName(MajordomoBroker.class.getSimpleName() + "(" + brokerName + ")#" + BROKER_COUNTER.getAndIncrement());
        this.ctx = new ZContext(SystemProperties.getValueIgnoreCase((String)"OpenCMW.nIoThreads", (int)1));
        this.rbacRoles = Collections.unmodifiableSortedSet(new TreeSet(Set.of(rbacRoles)));
        this.routerSocket = this.ctx.createSocket(SocketType.ROUTER);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.routerSocket);
        this.routerSocket.bind(INTERNAL_ADDRESS_BROKER);
        this.pubSocket = this.ctx.createSocket(SocketType.XPUB);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.pubSocket);
        this.pubSocket.setXpubVerbose(true);
        this.pubSocket.bind(INTERNAL_ADDRESS_PUBLISHER);
        this.subSocket = this.ctx.createSocket(SocketType.SUB);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.subSocket);
        this.subSocket.bind(INTERNAL_ADDRESS_SUBSCRIBE);
        this.registerDefaultServices(rbacRoles);
        this.dnsSocket = this.ctx.createSocket(SocketType.DEALER);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.dnsSocket);
        if (this.dnsAddress == null) {
            this.dnsSocket.connect(INTERNAL_ADDRESS_BROKER);
        } else {
            this.dnsSocket.connect(this.dnsAddress.toString());
        }
        LOGGER.atInfo().addArgument((Object)this.getName()).addArgument((Object)this.dnsAddress).log("register new '{}' broker with DNS: '{}'");
    }

    public void addInternalService(BasicMdpWorker worker) {
        assert (worker != null) : "worker must not be null";
        this.requireService(worker.getServiceName(), worker);
    }

    public URI bind(URI endpoint) {
        String requestedScheme = Objects.requireNonNull(endpoint, "endpoint is null").getScheme().toLowerCase(Locale.UK);
        boolean isRouterSocket = requestedScheme.startsWith("mdp") || requestedScheme.startsWith("tcp");
        (isRouterSocket ? this.routerSocket : this.pubSocket).bind(OpenCmwConstants.replaceScheme((URI)endpoint, (String)"tcp").toString());
        URI endpointAdjusted = OpenCmwConstants.replaceScheme((URI)endpoint, (String)(isRouterSocket ? "mdp" : "mds"));
        URI adjustedAddressPublic = OpenCmwConstants.resolveHost((URI)endpointAdjusted, (String)OpenCmwConstants.getLocalHostName());
        this.routerSockets.add(adjustedAddressPublic.toString());
        LOGGER.atDebug().addArgument((Object)adjustedAddressPublic).log("Majordomo broker/0.1 is active at '{}'");
        return adjustedAddressPublic;
    }

    public ZContext getContext() {
        return this.ctx;
    }

    public ZMQ.Socket getInternalRouterSocket() {
        return this.routerSocket;
    }

    public List<String> getRouterSockets() {
        return Collections.unmodifiableList(this.routerSockets);
    }

    public Collection<Service> getServices() {
        return this.services.values();
    }

    public Map<String, DnsServiceItem> getDnsCache() {
        return this.dnsCache;
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void removeService(String serviceName) {
        Service ret = this.services.remove(serviceName);
        ret.mdpWorker.forEach(BasicMdpWorker::stopWorker);
        ret.waiting.forEach(worker -> new OpenCmwProtocol.MdpMessage(worker.address, OpenCmwProtocol.MdpSubProtocol.PROT_WORKER, OpenCmwProtocol.Command.DISCONNECT, worker.service.nameBytes, OpenCmwProtocol.EMPTY_FRAME, URI.create(worker.service.name), OpenCmwProtocol.BROKER_SHUTDOWN, "", RBAC).send(worker.socket));
    }

    @Override
    public void run() {
        Worker[] deleteList;
        this.run.set(true);
        this.running.set(true);
        this.services.forEach((serviceName, service) -> service.internalWorkers.forEach(Thread::start));
        this.sendDnsHeartbeats(true);
        try (ZMQ.Poller items = this.ctx.createPoller(4);){
            items.register(this.routerSocket, 1);
            items.register(this.dnsSocket, 1);
            items.register(this.pubSocket, 1);
            items.register(this.subSocket, 1);
            while (this.run.get() && !Thread.currentThread().isInterrupted() && !this.ctx.isClosed() && items.poll(this.heartBeatInterval) != -1) {
                if (this.ctx.isClosed()) {
                    break;
                }
                int loopCount = 0;
                boolean receivedMsg = true;
                while (this.run.get() && !Thread.currentThread().isInterrupted() && receivedMsg) {
                    OpenCmwProtocol.MdpMessage routerMsg = OpenCmwProtocol.MdpMessage.receive((ZMQ.Socket)this.routerSocket, (boolean)false);
                    receivedMsg = this.handleReceivedMessage(this.routerSocket, routerMsg);
                    OpenCmwProtocol.MdpMessage subMsg = OpenCmwProtocol.MdpMessage.receive((ZMQ.Socket)this.subSocket, (boolean)false);
                    receivedMsg |= this.handleReceivedMessage(this.subSocket, subMsg);
                    OpenCmwProtocol.MdpMessage dnsMsg = OpenCmwProtocol.MdpMessage.receive((ZMQ.Socket)this.dnsSocket, (boolean)false);
                    receivedMsg |= this.handleReceivedMessage(this.dnsSocket, dnsMsg);
                    ZMsg pubMsg = ZMsg.recvMsg((ZMQ.Socket)this.pubSocket, (boolean)false);
                    receivedMsg |= this.handleSubscriptionMsg(pubMsg);
                    this.processClients();
                    if (loopCount % 10 == 0) {
                        this.purgeWorkers();
                        this.purgeClients();
                        this.purgeDnsServices();
                        this.sendHeartbeats();
                        this.sendDnsHeartbeats(false);
                    }
                    ++loopCount;
                }
            }
        }
        for (Worker worker : deleteList = this.workers.values().toArray(new Worker[0])) {
            this.deleteWorker(worker, true);
        }
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
        this.running.set(false);
    }

    public void stopBroker() {
        this.run.set(false);
        if (this.running.get()) {
            try {
                this.join(this.heartBeatInterval);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(this.getName() + " did not shut down in " + this.heartBeatInterval + " ms", e);
            }
        }
        this.close();
    }

    protected void deleteWorker(Worker worker, boolean disconnect) {
        assert (worker != null);
        if (disconnect && !this.ctx.isClosed()) {
            OpenCmwProtocol.MdpMessage disconnectMsg = new OpenCmwProtocol.MdpMessage(worker.address, OpenCmwProtocol.MdpSubProtocol.PROT_WORKER, OpenCmwProtocol.Command.DISCONNECT, worker.serviceName, OpenCmwProtocol.EMPTY_FRAME, URI.create(new String(worker.serviceName, StandardCharsets.UTF_8)), OpenCmwProtocol.BROKER_SHUTDOWN, "", RBAC);
            disconnectMsg.send(worker.socket);
        }
        if (worker.service != null) {
            worker.service.waiting.remove(worker);
        }
        this.workers.remove(worker.addressHex);
    }

    @Override
    public void close() {
        this.routerSocket.close();
        this.pubSocket.close();
        this.subSocket.close();
        this.dnsSocket.close();
        this.ctx.close();
    }

    protected void dispatch(Service service) {
        assert (service != null);
        this.purgeWorkers();
        while (!service.waiting.isEmpty() && service.requestsPending()) {
            OpenCmwProtocol.MdpMessage msg = service.getNextPrioritisedMessage();
            if (msg == null) {
                assert (false) : "getNextPrioritisedMessage should not be null";
                continue;
            }
            Worker worker = service.waiting.pop();
            this.waiting.remove(worker);
            msg.serviceNameBytes = msg.senderID;
            msg.senderID = worker.address;
            msg.protocol = OpenCmwProtocol.MdpSubProtocol.PROT_WORKER;
            msg.send(worker.socket);
        }
    }

    protected boolean handleReceivedMessage(ZMQ.Socket receiveSocket, OpenCmwProtocol.MdpMessage msg) {
        if (msg == null) {
            return false;
        }
        String topic = msg.topic.toString();
        switch (msg.protocol) {
            case PROT_CLIENT: 
            case PROT_CLIENT_HTTP: {
                switch (msg.command) {
                    case READY: {
                        if (msg.topic.getScheme() != null) {
                            this.registerNewService(msg.getServiceName());
                            DnsServiceItem ret = this.dnsCache.computeIfAbsent(msg.getServiceName(), s -> new DnsServiceItem(msg.senderID, msg.getServiceName()));
                            ret.uri.add(msg.topic);
                            ret.updateExpiryTimeStamp();
                        }
                        return true;
                    }
                    case SUBSCRIBE: {
                        if (this.activeSubscriptions.computeIfAbsent(msg.topic, s -> new AtomicInteger()).incrementAndGet() == 1) {
                            this.subSocket.subscribe(topic);
                        }
                        this.routerBasedSubscriptions.computeIfAbsent(msg.topic, s -> new ArrayList()).add(msg.senderID);
                        return true;
                    }
                    case UNSUBSCRIBE: {
                        if (this.activeSubscriptions.computeIfAbsent(msg.topic, s -> new AtomicInteger()).decrementAndGet() <= 0) {
                            this.subSocket.unsubscribe(topic);
                        }
                        this.routerBasedSubscriptions.computeIfAbsent(msg.topic, s -> new ArrayList()).remove(msg.senderID);
                        if (Objects.requireNonNullElse(this.routerBasedSubscriptions.get(msg.topic), "").toString().isEmpty()) {
                            this.routerBasedSubscriptions.remove(msg.topic);
                        }
                        return true;
                    }
                    case W_HEARTBEAT: {
                        this.sendDnsHeartbeats(true);
                        return true;
                    }
                }
                String senderName = msg.getSenderName();
                Client client = this.clients.computeIfAbsent(senderName, s -> new Client(receiveSocket, senderName, msg.senderID));
                client.offerToQueue(msg);
                return true;
            }
            case PROT_WORKER: {
                this.processWorker(receiveSocket, msg);
                return true;
            }
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.atDebug().addArgument((Object)msg).log("Majordomo broker invalid message: '{}'");
        }
        return false;
    }

    protected void processClients() {
        this.clients.forEach((name, client) -> {
            OpenCmwProtocol.MdpMessage clientMessage = client.pop();
            if (clientMessage == null) {
                return;
            }
            Service service = this.getBestMatchingService(StringUtils.strip((String)URI.create(clientMessage.getServiceName()).getPath(), (String)"/"));
            if (service == null) {
                OpenCmwProtocol.MdpMessage msg = new OpenCmwProtocol.MdpMessage(clientMessage.senderID, OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.FINAL, clientMessage.serviceNameBytes, clientMessage.clientRequestID, URI.create("mmi.service"), "501".getBytes(StandardCharsets.UTF_8), "unknown service (error 501): '" + clientMessage.getServiceName() + "'", RBAC);
                msg.send(client.socket);
                return;
            }
            service.putPrioritisedMessage(clientMessage);
            this.dispatch(service);
        });
    }

    protected void processWorker(ZMQ.Socket receiveSocket, OpenCmwProtocol.MdpMessage msg) {
        String senderIdHex = ZData.strhex((byte[])msg.senderID);
        String serviceName = msg.getServiceName();
        boolean workerReady = this.workers.containsKey(senderIdHex);
        Worker worker = this.requireWorker(receiveSocket, msg.senderID, senderIdHex, msg.serviceNameBytes);
        switch (msg.command) {
            case READY: {
                LOGGER.atTrace().addArgument((Object)serviceName).log("log new local/external worker for service {} - " + msg);
                worker.service = this.requireService(serviceName, new BasicMdpWorker[0]);
                this.workerWaiting(worker);
                worker.service.serviceDescription = Arrays.copyOf(msg.data, msg.data.length);
                if (!msg.topic.toString().isBlank() && msg.topic.getScheme() != null) {
                    this.registerNewService(this.brokerName);
                    this.routerSockets.add(msg.topic.toString());
                    DnsServiceItem ret = this.dnsCache.computeIfAbsent(this.brokerName, s -> new DnsServiceItem(msg.senderID, this.brokerName));
                    ret.uri.add(msg.topic);
                }
                msg.data = msg.serviceNameBytes;
                msg.serviceNameBytes = "mmi.service".getBytes(StandardCharsets.UTF_8);
                msg.command = OpenCmwProtocol.Command.W_NOTIFY;
                msg.clientRequestID = this.getName().getBytes(StandardCharsets.UTF_8);
                msg.topic = URI.create("mmi.service");
                msg.errors = "";
                if (this.pubSocket.sendMore("mmi.service") && msg.send(this.pubSocket)) break;
                LOGGER.atWarn().addArgument((Object)msg.getServiceName()).log("could not notify service change for '{}'");
                break;
            }
            case W_HEARTBEAT: {
                if (workerReady) {
                    worker.updateExpiryTimeStamp();
                    break;
                }
                this.deleteWorker(worker, true);
                break;
            }
            case DISCONNECT: {
                break;
            }
            case PARTIAL: 
            case FINAL: {
                if (workerReady) {
                    Client client = this.clients.get(msg.getServiceName());
                    if (client == null || client.socket == null) break;
                    byte[] serviceID = worker.service.nameBytes;
                    msg.senderID = msg.serviceNameBytes;
                    msg.protocol = OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT;
                    msg.serviceNameBytes = serviceID;
                    msg.send(client.socket);
                    this.workerWaiting(worker);
                    break;
                }
                this.deleteWorker(worker, true);
                break;
            }
            case W_NOTIFY: {
                byte[] serviceID = worker.service.nameBytes;
                msg.senderID = msg.serviceNameBytes;
                msg.serviceNameBytes = serviceID;
                msg.protocol = OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT;
                msg.command = OpenCmwProtocol.Command.FINAL;
                this.dispatchMessageToMatchingSubscriber(msg);
                break;
            }
            default: {
                if (!LOGGER.isDebugEnabled()) break;
                LOGGER.atDebug().addArgument((Object)msg).log("Majordomo broker invalid message: '{}'");
            }
        }
    }

    protected void purgeClients() {
        if (this.clientTimeOut <= 0L) {
            return;
        }
        for (String clientName : this.clients.keySet()) {
            Client client = this.clients.get(clientName);
            if (client != null && client.expiry >= System.currentTimeMillis()) continue;
            this.clients.remove(clientName);
            if (!LOGGER.isDebugEnabled()) continue;
            LOGGER.atDebug().addArgument((Object)client).log("Majordomo broker deleting expired client: '{}'");
        }
    }

    protected void purgeDnsServices() {
        if (System.currentTimeMillis() >= this.dnsHeartbeatAt) {
            ArrayList<DnsServiceItem> cachedList = new ArrayList<DnsServiceItem>(this.dnsCache.values());
            OpenCmwProtocol.MdpMessage challengeMessage = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.W_HEARTBEAT, OpenCmwProtocol.EMPTY_FRAME, "dnsChallenge".getBytes(StandardCharsets.UTF_8), OpenCmwProtocol.EMPTY_URI, OpenCmwProtocol.EMPTY_FRAME, "", RBAC);
            for (DnsServiceItem registeredService : cachedList) {
                if (registeredService.serviceName.equalsIgnoreCase(this.brokerName)) {
                    registeredService.updateExpiryTimeStamp();
                }
                challengeMessage.senderID = registeredService.address;
                challengeMessage.serviceNameBytes = registeredService.serviceName.getBytes(StandardCharsets.UTF_8);
                challengeMessage.send(this.routerSocket);
                if (System.currentTimeMillis() <= registeredService.expiry) continue;
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.atInfo().addArgument((Object)registeredService).log("Majordomo broker deleting expired dns service: '{}'");
                }
                this.dnsCache.remove(registeredService.serviceName);
            }
            this.dnsHeartbeatAt = System.currentTimeMillis() + this.dnsTimeOut;
        }
    }

    protected void purgeWorkers() {
        Worker w = this.waiting.peekFirst();
        while (w != null && w.expiry < System.currentTimeMillis()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.atInfo().addArgument((Object)w.addressHex).addArgument((Object)(w.service == null ? "(unknown)" : w.service.name)).log("Majordomo broker deleting expired worker: '{}' - service: '{}'");
            }
            this.deleteWorker(this.waiting.pollFirst(), false);
            w = this.waiting.peekFirst();
        }
    }

    protected void registerDefaultServices(RbacRole<?>[] rbacRoles) {
        int nServiceThreads = 3;
        this.addInternalService(new MmiServiceHelper.MmiService(this, rbacRoles));
        this.addInternalService(new MmiServiceHelper.MmiOpenApi(this, rbacRoles));
        this.addInternalService(new MmiServiceHelper.MmiDns(this, rbacRoles));
        for (int i = 0; i < 3; ++i) {
            this.addInternalService(new MmiServiceHelper.MmiEcho(this, rbacRoles));
        }
    }

    protected Service requireService(String serviceName, BasicMdpWorker ... worker) {
        assert (serviceName != null);
        BasicMdpWorker w = worker.length > 0 ? worker[0] : null;
        Service service = this.services.computeIfAbsent(serviceName, s -> new Service(serviceName, serviceName.getBytes(StandardCharsets.UTF_8), w));
        if (w != null) {
            w.start();
        }
        return service;
    }

    @NotNull
    protected Worker requireWorker(ZMQ.Socket socket, byte[] address, String addressHex, byte[] serviceName) {
        assert (addressHex != null);
        return this.workers.computeIfAbsent(addressHex, identity -> {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.atInfo().addArgument((Object)addressHex).addArgument((Object)ZData.toString((byte[])serviceName)).log("registering new worker: '{}' - '{}'");
            }
            return new Worker(socket, address, addressHex, serviceName);
        });
    }

    protected void sendDnsHeartbeats(boolean force) {
        if (System.currentTimeMillis() >= this.dnsHeartbeatAt || force) {
            OpenCmwProtocol.MdpMessage readyMsg = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.READY, this.brokerName.getBytes(StandardCharsets.UTF_8), "clientID".getBytes(StandardCharsets.UTF_8), OpenCmwProtocol.EMPTY_URI, OpenCmwProtocol.EMPTY_FRAME, "", RBAC);
            ArrayList<String> localCopy = new ArrayList<String>(this.getRouterSockets());
            for (String routerAddress : localCopy) {
                readyMsg.topic = URI.create(routerAddress);
                this.registerWithDnsServices(readyMsg);
            }
            this.services.keySet().forEach(this::registerNewService);
        }
    }

    private void registerNewService(String serviceName) {
        OpenCmwProtocol.MdpMessage readyMsg = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.READY, this.brokerName.getBytes(StandardCharsets.UTF_8), "clientID".getBytes(StandardCharsets.UTF_8), OpenCmwProtocol.EMPTY_URI, OpenCmwProtocol.EMPTY_FRAME, "", RBAC);
        ArrayList<String> localCopy = new ArrayList<String>(this.getRouterSockets());
        for (String routerAddress : localCopy) {
            readyMsg.topic = URI.create(routerAddress + "/" + serviceName);
            this.registerWithDnsServices(readyMsg);
        }
    }

    private void registerWithDnsServices(OpenCmwProtocol.MdpMessage readyMsg) {
        if (this.dnsAddress != null) {
            readyMsg.send(this.dnsSocket);
        }
        DnsServiceItem ret = this.dnsCache.computeIfAbsent(this.brokerName, s -> new DnsServiceItem(readyMsg.senderID, this.brokerName));
        ret.uri.add(readyMsg.topic);
        ret.updateExpiryTimeStamp();
    }

    protected void sendHeartbeats() {
        if (System.currentTimeMillis() >= this.heartbeatAt) {
            OpenCmwProtocol.MdpMessage heartbeatMsg = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_WORKER, OpenCmwProtocol.Command.W_HEARTBEAT, OpenCmwProtocol.EMPTY_FRAME, OpenCmwProtocol.EMPTY_FRAME, OpenCmwProtocol.EMPTY_URI, OpenCmwProtocol.EMPTY_FRAME, "", RBAC);
            for (Worker worker : this.waiting) {
                heartbeatMsg.senderID = worker.address;
                heartbeatMsg.serviceNameBytes = worker.service.nameBytes;
                heartbeatMsg.send(worker.socket);
            }
            this.heartbeatAt = System.currentTimeMillis() + this.heartBeatInterval;
        }
    }

    protected void workerWaiting(Worker worker) {
        this.waiting.addLast(worker);
        worker.service.waiting.push(worker);
        worker.updateExpiryTimeStamp();
        this.dispatch(worker.service);
    }

    private void dispatchMessageToMatchingSubscriber(OpenCmwProtocol.MdpMessage msg) {
        this.activeSubscriptions.keySet().stream().filter(s -> this.subscriptionMatcher.test(msg.topic, (URI)s)).forEach(s -> {
            this.pubSocket.sendMore(s.toString());
            msg.send(this.pubSocket);
        });
        List<byte[]> tClients = this.routerBasedSubscriptions.get(msg.topic);
        if (tClients == null) {
            return;
        }
        for (byte[] clientID : tClients) {
            msg.senderID = clientID;
            msg.send(this.routerSocket);
        }
    }

    private boolean handleSubscriptionMsg(ZMsg subMsg) {
        if (subMsg == null || subMsg.isEmpty()) {
            return false;
        }
        byte[] topicBytes = subMsg.getFirst().getData();
        if (topicBytes.length == 0) {
            return false;
        }
        URI subscriptionTopic = URI.create(new String(topicBytes, 1, topicBytes.length - 1, StandardCharsets.UTF_8));
        LOGGER.atDebug().addArgument((Object)topicBytes[0]).addArgument((Object)subscriptionTopic).log("received subscription request: {} to '{}'");
        switch (topicBytes[0]) {
            case 0: {
                if (this.activeSubscriptions.computeIfAbsent(subscriptionTopic, s -> new AtomicInteger()).decrementAndGet() <= 0) {
                    this.subSocket.unsubscribe(subscriptionTopic.toString());
                }
                return true;
            }
            case 1: {
                if (this.activeSubscriptions.computeIfAbsent(subscriptionTopic, s -> new AtomicInteger()).incrementAndGet() == 1) {
                    this.subSocket.subscribe(subscriptionTopic.toString());
                }
                return true;
            }
        }
        throw new IllegalStateException("received invalid subscription ID " + subMsg);
    }

    Service getBestMatchingService(String serviceName) {
        List sortedList = this.services.keySet().stream().filter(serviceName::startsWith).sorted(Comparator.comparingInt(String::length)).collect(Collectors.toList());
        if (!sortedList.isEmpty()) {
            return this.services.get(sortedList.get(0));
        }
        String longServiceName = this.brokerName + "/" + serviceName;
        List sortedList2nd = this.services.keySet().stream().filter(longServiceName::startsWith).sorted(Comparator.comparingInt(String::length)).collect(Collectors.toList());
        return sortedList2nd.isEmpty() ? null : this.services.get(sortedList2nd.get(0));
    }

    public class DnsServiceItem {
        protected final byte[] address;
        protected final String serviceName;
        protected final List<URI> uri = new NoDuplicatesList();
        protected long expiry;

        private DnsServiceItem(byte[] address, String serviceName) {
            this.address = address;
            this.serviceName = serviceName;
            this.updateExpiryTimeStamp();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof DnsServiceItem)) {
                return false;
            }
            DnsServiceItem that = (DnsServiceItem)o;
            return this.serviceName.equals(that.serviceName);
        }

        public String getDnsEntry() {
            return "[" + this.serviceName + ": " + this.uri.stream().map(URI::toString).collect(Collectors.joining(",")) + "]";
        }

        public String getDnsEntryHtml() {
            Optional<URI> webHandler = this.uri.stream().filter(u -> "https".equalsIgnoreCase(u.getScheme())).findFirst();
            if (webHandler.isEmpty()) {
                webHandler = this.uri.stream().filter(u -> "http".equalsIgnoreCase(u.getScheme())).findFirst();
            }
            String wrappedService = webHandler.isEmpty() ? this.serviceName : MmiServiceHelper.wrapInAnchor(this.serviceName, webHandler.get());
            return "[" + wrappedService + ": " + this.uri.stream().map(u -> MmiServiceHelper.wrapInAnchor(u.toString(), u)).collect(Collectors.joining(", ")) + "]";
        }

        public List<URI> getUri() {
            return this.uri;
        }

        public int hashCode() {
            return this.serviceName.hashCode();
        }

        public String toString() {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", Locale.UK);
            return "DnsServiceItem{address=" + ZData.toString((byte[])this.address) + ", serviceName='" + this.serviceName + "', uri= '" + this.uri + "',expiry=" + this.expiry + " - " + sdf.format(this.expiry) + "}";
        }

        private void updateExpiryTimeStamp() {
            this.expiry = System.currentTimeMillis() + MajordomoBroker.this.dnsTimeOut * (long)MajordomoBroker.this.heartBeatLiveness;
        }
    }

    protected class Worker {
        protected final ZMQ.Socket socket;
        protected final byte[] address;
        protected final String addressHex;
        protected final byte[] serviceName;
        protected Service service;
        protected long expiry;

        private Worker(ZMQ.Socket socket, byte[] address, String addressHex, byte[] serviceName) {
            this.socket = socket;
            this.address = address;
            this.addressHex = addressHex;
            this.serviceName = serviceName;
            this.updateExpiryTimeStamp();
        }

        private void updateExpiryTimeStamp() {
            this.expiry = System.currentTimeMillis() + MajordomoBroker.this.heartBeatExpiry;
        }
    }

    protected class Client {
        protected final ZMQ.Socket socket;
        protected final String name;
        protected final byte[] nameBytes;
        protected final String nameHex;
        private final Deque<OpenCmwProtocol.MdpMessage> requests = new ArrayDeque<OpenCmwProtocol.MdpMessage>();
        protected long expiry;

        protected Client(ZMQ.Socket socket, String name, byte[] nameBytes) {
            this.expiry = System.currentTimeMillis() + MajordomoBroker.this.clientTimeOut;
            this.socket = socket;
            this.name = name;
            this.nameBytes = nameBytes == null ? name.getBytes(StandardCharsets.UTF_8) : nameBytes;
            this.nameHex = ZData.strhex((byte[])nameBytes);
        }

        protected void offerToQueue(OpenCmwProtocol.MdpMessage msg) {
            this.expiry = System.currentTimeMillis() + MajordomoBroker.this.clientTimeOut;
            this.requests.offer(msg);
        }

        protected OpenCmwProtocol.MdpMessage pop() {
            return this.requests.isEmpty() ? null : this.requests.poll();
        }
    }

    protected class Service {
        protected final String name;
        protected final byte[] nameBytes;
        protected final List<BasicMdpWorker> mdpWorker = new ArrayList<BasicMdpWorker>();
        protected final Map<RbacRole<?>, Queue<OpenCmwProtocol.MdpMessage>> requests = new HashMap();
        protected final Deque<Worker> waiting = new ArrayDeque<Worker>();
        protected final List<Thread> internalWorkers = new ArrayList<Thread>();
        protected byte[] serviceDescription;

        private Service(String name, byte[] nameBytes, BasicMdpWorker mdpWorker) {
            this.name = name;
            byte[] byArray = this.nameBytes = nameBytes == null ? name.getBytes(StandardCharsets.UTF_8) : nameBytes;
            if (mdpWorker != null) {
                this.mdpWorker.add(mdpWorker);
            }
            MajordomoBroker.this.rbacRoles.forEach(role -> this.requests.put((RbacRole<?>)role, new ArrayDeque()));
            this.requests.put((RbacRole<?>)BasicRbacRole.NULL, new ArrayDeque());
        }

        private OpenCmwProtocol.MdpMessage getNextPrioritisedMessage() {
            for (RbacRole rbacRole : MajordomoBroker.this.rbacRoles) {
                Queue<OpenCmwProtocol.MdpMessage> queue = this.requests.get(rbacRole);
                if (queue.isEmpty()) continue;
                return queue.poll();
            }
            Queue<OpenCmwProtocol.MdpMessage> queue = this.requests.get(BasicRbacRole.NULL);
            return queue.isEmpty() ? null : queue.poll();
        }

        private void putPrioritisedMessage(OpenCmwProtocol.MdpMessage queuedMessage) {
            if (queuedMessage.hasRbackToken()) {
                RbacToken rbacToken = RbacToken.from((byte[])queuedMessage.rbacToken);
                Queue<OpenCmwProtocol.MdpMessage> roleBasedQueue = this.requests.get(rbacToken.getRole());
                if (roleBasedQueue != null) {
                    roleBasedQueue.offer(queuedMessage);
                }
            } else {
                this.requests.get(BasicRbacRole.NULL).offer(queuedMessage);
            }
        }

        private boolean requestsPending() {
            return this.requests.entrySet().stream().anyMatch(map -> !((Queue)map.getValue()).isEmpty());
        }

        public String toString() {
            return "Service{name='" + this.name + "'}";
        }
    }
}

