/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.client.cmwlight;

import io.opencmw.OpenCmwConstants;
import io.opencmw.OpenCmwProtocol;
import io.opencmw.QueryParameterParser;
import io.opencmw.client.DataSource;
import io.opencmw.client.DnsResolver;
import io.opencmw.client.OpenCmwDataSource;
import io.opencmw.client.cmwlight.CmwLightMessage;
import io.opencmw.client.cmwlight.CmwLightProtocol;
import io.opencmw.serialiser.IoSerialiser;
import io.opencmw.serialiser.spi.CmwLightSerialiser;
import io.opencmw.utils.NoDuplicatesList;
import io.opencmw.utils.SystemProperties;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMonitor;
import org.zeromq.ZMsg;

public class CmwLightDataSource
extends DataSource {
    private static final String RDA_3_PROTOCOL = "rda3";
    private static final List<String> APPLICABLE_SCHEMES = List.of("rda3");
    private static final List<DnsResolver> RESOLVERS = Collections.synchronizedList(new NoDuplicatesList());
    public static final DataSource.Factory FACTORY = new DataSource.Factory(){

        @Override
        public List<String> getApplicableSchemes() {
            return APPLICABLE_SCHEMES;
        }

        @Override
        public Class<? extends IoSerialiser> getMatchingSerialiserType(@NotNull URI endpoint) {
            return CmwLightSerialiser.class;
        }

        @Override
        public List<DnsResolver> getRegisteredDnsResolver() {
            return RESOLVERS;
        }

        @Override
        public DataSource newInstance(ZContext context, @NotNull URI endpoint, @NotNull Duration timeout, @NotNull ExecutorService executorService, @NotNull String clientId) {
            return new CmwLightDataSource(context, endpoint, executorService, clientId);
        }
    };
    private static final Logger LOGGER = LoggerFactory.getLogger(CmwLightDataSource.class);
    private static final AtomicLong CONNECTION_ID_GENERATOR = new AtomicLong(0L);
    private static final AtomicInteger REQUEST_ID_GENERATOR = new AtomicInteger(0);
    protected final AtomicInteger channelId = new AtomicInteger(0);
    protected final ZContext context;
    protected final ZMQ.Socket socket;
    protected final AtomicReference<ZMonitor.Event> connectionState = new AtomicReference<ZMonitor.Event>(ZMonitor.Event.CLOSED);
    protected final String sessionId;
    protected final long heartbeatInterval = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeat", (long)1000L);
    protected final int heartbeatAllowedMisses = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeatLiveness", (int)3);
    protected final long subscriptionTimeout = SystemProperties.getValueIgnoreCase((String)"OpenCMW.subscriptionTimeOut", (long)1000L);
    protected final Map<Long, Subscription> subscriptions = new HashMap<Long, Subscription>();
    protected final Map<String, Subscription> subscriptionsByReqId = new HashMap<String, Subscription>();
    protected final Map<Long, Subscription> replyIdMap = new HashMap<Long, Subscription>();
    protected final URI endpoint;
    private final AtomicInteger reconnectAttempt = new AtomicInteger(0);
    private final ZMonitor socketMonitor;
    private final Queue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    private final Map<Long, Request> pendingRequests = new HashMap<Long, Request>();
    private final ExecutorService executorService;
    protected long connectionId;
    protected long lastHeartbeatReceived = -1L;
    protected long lastHeartbeatSent = -1L;
    protected int backOff = 20;
    private URI connectedAddress = OpenCmwProtocol.EMPTY_URI;

    public CmwLightDataSource(@NotNull ZContext context, @NotNull URI endpoint, @NotNull ExecutorService executorService, String clientId) {
        super(endpoint);
        LOGGER.atTrace().addArgument((Object)endpoint).log("connecting to: {}");
        this.context = context;
        this.executorService = executorService;
        this.socket = context.createSocket(SocketType.DEALER);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.socket);
        this.sessionId = this.getSessionId(clientId);
        this.endpoint = endpoint;
        this.socketMonitor = new ZMonitor(context, this.socket);
        this.socketMonitor.add(new ZMonitor.Event[]{ZMonitor.Event.CLOSED, ZMonitor.Event.CONNECTED, ZMonitor.Event.DISCONNECTED});
        this.socketMonitor.start();
        this.connect();
    }

    @Override
    public void close() throws IOException {
        this.socketMonitor.close();
        this.socket.close();
    }

    public void connect() {
        if (this.connectionState.getAndSet(ZMonitor.Event.CONNECT_RETRIED) != ZMonitor.Event.CLOSED) {
            return;
        }
        URI resolveAddress = this.endpoint;
        if (resolveAddress.getAuthority() == null || resolveAddress.getPort() == -1) {
            Optional resolver = this.getFactory().getRegisteredDnsResolver().stream().findFirst();
            if (resolver.isEmpty()) {
                LOGGER.atWarn().addArgument((Object)this.endpoint).log("cannot resolve {} without a registered DNS resolver");
                this.backOff *= 2;
                this.connectionState.set(ZMonitor.Event.CLOSED);
                return;
            }
            try {
                resolveAddress = new URI(resolveAddress.getScheme(), null, "/" + OpenCmwConstants.getDeviceName((URI)this.endpoint), null, null);
                Map<URI, List<URI>> candidates = ((DnsResolver)resolver.get()).resolveNames(List.of(resolveAddress));
                if (Objects.requireNonNull(candidates.get(resolveAddress), "candidates did not contain '" + resolveAddress + "':" + candidates).isEmpty()) {
                    throw new UnknownHostException("DNS resolver could not resolve " + this.endpoint + " - unknown service - candidates" + candidates + " - " + resolveAddress);
                }
                resolveAddress = candidates.get(resolveAddress).get(0);
            }
            catch (URISyntaxException | UnknownHostException e) {
                LOGGER.atError().setCause((Throwable)e).addArgument((Object)e.getMessage()).log("Error resolving device from nameserver, using address from endpoint. Error was: {}");
                this.backOff *= 2;
                this.connectionState.set(ZMonitor.Event.CLOSED);
                return;
            }
        }
        this.lastHeartbeatSent = System.currentTimeMillis();
        if (resolveAddress.getPort() == -1) {
            LOGGER.atError().addArgument((Object)this.endpoint).log("could not resolve host service address: '{}'");
        }
        try {
            String identity = this.getIdentity();
            resolveAddress = OpenCmwConstants.replaceSchemeKeepOnlyAuthority((URI)resolveAddress, (String)"tcp");
            this.socket.setIdentity(identity.getBytes());
            LOGGER.atDebug().addArgument((Object)resolveAddress).addArgument((Object)resolveAddress).addArgument((Object)identity).log("connecting to: '{}'->'{}' with identity {}");
            if (!this.socket.connect(resolveAddress.toString())) {
                LOGGER.atError().addArgument((Object)this.endpoint).addArgument((Object)resolveAddress.toString()).log("could not connect requested URI '{}' to '{}'");
                this.connectedAddress = OpenCmwProtocol.EMPTY_URI;
            }
            this.connectedAddress = resolveAddress;
            CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.connect("1.0.0"));
        }
        catch (CmwLightProtocol.RdaLightException | NumberFormatException | ZMQException e) {
            LOGGER.atError().setCause(e).addArgument((Object)this.connectedAddress).addArgument((Object)this.endpoint).log("failed to connect to '{}' source host address: '{}'");
            this.backOff *= 2;
            this.connectionState.set(ZMonitor.Event.CLOSED);
        }
    }

    @Override
    public void get(String requestId, URI endpoint, byte[] data, byte[] rbacToken) {
        Request request = new Request(CmwLightProtocol.RequestType.GET, requestId, endpoint, data, rbacToken);
        this.queuedRequests.add(request);
    }

    public ZMonitor.Event getConnectionState() {
        return this.connectionState.get();
    }

    public ZContext getContext() {
        return this.context;
    }

    @Override
    public ZMsg getMessage() {
        CmwLightMessage reply = this.receiveData();
        if (reply == null) {
            return null;
        }
        long currentTime = System.currentTimeMillis();
        switch (reply.messageType) {
            case SERVER_CONNECT_ACK: {
                if (this.connectionState.compareAndSet(ZMonitor.Event.CONNECT_RETRIED, ZMonitor.Event.CONNECTED)) {
                    LOGGER.atTrace().addArgument((Object)this.connectedAddress).log("Connected to server: {}");
                    this.lastHeartbeatReceived = currentTime;
                    this.backOff = 20;
                } else {
                    LOGGER.atWarn().addArgument((Object)reply).log("ignoring unsolicited connection acknowledgement: {}");
                }
                return new ZMsg();
            }
            case SERVER_HB: {
                if (this.connectionState.get() != ZMonitor.Event.CONNECTED) {
                    LOGGER.atWarn().addArgument((Object)reply).log("ignoring heartbeat received before connection established: {}");
                    return new ZMsg();
                }
                this.lastHeartbeatReceived = currentTime;
                return new ZMsg();
            }
            case SERVER_REP: {
                if (this.connectionState.get() != ZMonitor.Event.CONNECTED) {
                    LOGGER.atWarn().addArgument((Object)reply).log("ignoring data received before connection established: {}");
                    return new ZMsg();
                }
                this.lastHeartbeatReceived = currentTime;
                return this.handleServerReply(reply, currentTime);
            }
        }
        LOGGER.atWarn().addArgument((Object)reply).log("ignoring client message from server: {}");
        return new ZMsg();
    }

    @Override
    public ZMQ.Socket getSocket() {
        return this.socket;
    }

    @Override
    public long housekeeping() {
        ZMonitor.ZEvent event;
        long currentTime = System.currentTimeMillis();
        block9: while ((event = this.socketMonitor.nextEvent(false)) != null) {
            switch (event.type) {
                case DISCONNECTED: 
                case CLOSED: {
                    this.connectionState.set(ZMonitor.Event.CLOSED);
                    continue block9;
                }
                case CONNECTED: {
                    continue block9;
                }
            }
            LOGGER.atDebug().addArgument((Object)event).log("unknown socket event: {}");
        }
        switch (this.connectionState.get()) {
            case DISCONNECTED: 
            case CLOSED: {
                if (currentTime > this.lastHeartbeatSent + (long)this.backOff) {
                    this.executorService.submit(this::reconnect);
                }
                return this.lastHeartbeatSent + (long)this.backOff;
            }
            case CONNECT_RETRIED: {
                if (currentTime > this.lastHeartbeatSent + this.heartbeatInterval * (long)this.heartbeatAllowedMisses) {
                    this.backOff *= 2;
                    this.lastHeartbeatSent = currentTime;
                    LOGGER.atTrace().addArgument((Object)this.connectedAddress).addArgument((Object)this.backOff).log("Connection timed out for {}, retrying in {} ms");
                    this.disconnect();
                }
                return this.lastHeartbeatSent + this.heartbeatInterval * (long)this.heartbeatAllowedMisses;
            }
            case CONNECTED: {
                Request request;
                this.reconnectAttempt.set(0);
                while ((request = this.queuedRequests.poll()) != null) {
                    this.pendingRequests.put(request.id, request);
                    if (this.sendRequest(request)) continue;
                    LOGGER.atWarn().addArgument((Object)this.endpoint).log("could not send request for host {}");
                }
                if (currentTime > this.lastHeartbeatSent + this.heartbeatInterval) {
                    this.sendHeartBeat();
                    this.lastHeartbeatSent = currentTime;
                    if (this.lastHeartbeatReceived + this.heartbeatInterval * (long)this.heartbeatAllowedMisses < currentTime) {
                        LOGGER.atDebug().addArgument((Object)this.backOff).log("Connection timed out, reconnecting in {} ms");
                        this.disconnect();
                        return this.heartbeatInterval;
                    }
                    for (Subscription sub : this.subscriptions.values()) {
                        this.updateSubscription(currentTime, sub);
                    }
                }
                return this.lastHeartbeatSent + this.heartbeatInterval;
            }
        }
        throw new IllegalStateException("unexpected connection state: " + this.connectionState.get());
    }

    public void reconnect() {
        LOGGER.atDebug().addArgument((Object)this.endpoint).log("need to reconnect for URI {}");
        this.disconnect();
        this.connect();
    }

    public void registerDnsResolver(@NotNull DnsResolver resolver) {
        this.getFactory().registerDnsResolver(resolver);
    }

    public void sendHeartBeat() {
        try {
            CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.CLIENT_HB);
        }
        catch (CmwLightProtocol.RdaLightException e) {
            throw new IllegalStateException("error sending heartbeat", e);
        }
    }

    @Override
    public void set(String requestId, URI endpoint, byte[] data, byte[] rbacToken) {
        Request request = new Request(CmwLightProtocol.RequestType.SET, requestId, endpoint, data, rbacToken);
        this.queuedRequests.add(request);
    }

    @Override
    public void subscribe(String reqId, URI endpoint, byte[] rbacToken) {
        try {
            ParsedEndpoint ep = new ParsedEndpoint(endpoint);
            Subscription sub = new Subscription(endpoint, ep.device, ep.property, ep.ctx, ep.filters);
            sub.idString = reqId;
            this.subscriptions.put(sub.id, sub);
            this.subscriptionsByReqId.put(reqId, sub);
        }
        catch (CmwLightProtocol.RdaLightException e) {
            throw new IllegalArgumentException("invalid endpoint: '" + endpoint + "'", e);
        }
    }

    @Override
    public void unsubscribe(String reqId) {
        this.subscriptionsByReqId.get((Object)reqId).subscriptionState = SubscriptionState.CANCELED;
    }

    @Override
    protected DataSource.Factory getFactory() {
        return FACTORY;
    }

    private void disconnect() {
        LOGGER.atDebug().addArgument((Object)this.connectedAddress).log("disconnecting {}");
        this.connectionState.set(ZMonitor.Event.CLOSED);
        if (this.connectedAddress != OpenCmwProtocol.EMPTY_URI) {
            try {
                this.socket.disconnect(this.connectedAddress.toString());
            }
            catch (ZMQException e) {
                LOGGER.atError().setCause((Throwable)e).log("Failed to disconnect socket");
            }
        }
        for (Subscription sub : this.subscriptions.values()) {
            sub.subscriptionState = SubscriptionState.UNSUBSCRIBED;
        }
    }

    private String getIdentity() {
        String hostname;
        try {
            hostname = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            hostname = "localhost";
        }
        long processId = ProcessHandle.current().pid();
        this.connectionId = CONNECTION_ID_GENERATOR.incrementAndGet();
        int chId = this.channelId.incrementAndGet();
        return hostname + "/" + processId + "/" + this.connectionId + "/" + chId;
    }

    private String getSessionId(String clientId) {
        return "CmwLightClient{pid=" + ProcessHandle.current().pid() + ", conn=" + this.connectionId + ", clientId=" + clientId + "}";
    }

    private ZMsg handleServerReply(CmwLightMessage reply, long currentTime) {
        switch (reply.requestType) {
            case REPLY: {
                Request requestForReply = this.pendingRequests.remove(reply.id);
                try {
                    return OpenCmwDataSource.createInternalMsg(requestForReply.requestId.getBytes(StandardCharsets.UTF_8), new ParsedEndpoint(requestForReply.endpoint, reply.dataContext.cycleName).toURI(), reply.bodyData, null);
                }
                catch (CmwLightProtocol.RdaLightException | URISyntaxException e) {
                    LOGGER.atWarn().addArgument((Object)requestForReply.endpoint).addArgument((Object)reply.dataContext.cycleName).log("Adding reply context to URI results in illegal url {} + {}");
                    return new ZMsg();
                }
            }
            case EXCEPTION: {
                Request requestForException = this.pendingRequests.remove(reply.id);
                return OpenCmwDataSource.createInternalMsg(requestForException.requestId.getBytes(StandardCharsets.UTF_8), requestForException.endpoint, null, reply.exceptionMessage.message);
            }
            case SUBSCRIBE: {
                long id = reply.id;
                Subscription sub = this.subscriptions.get(id);
                sub.updateId = (Long)reply.options.get(CmwLightProtocol.FieldName.SOURCE_ID_TAG.value());
                this.replyIdMap.put(sub.updateId, sub);
                sub.subscriptionState = SubscriptionState.SUBSCRIBED;
                LOGGER.atDebug().addArgument((Object)sub.device).addArgument((Object)sub.property).log("subscription successful: {}/{}");
                sub.backOff = 20;
                return new ZMsg();
            }
            case UNSUBSCRIBE: {
                Subscription subscriptionForUnsub = this.replyIdMap.remove(reply.id);
                this.subscriptionsByReqId.remove(subscriptionForUnsub.idString);
                this.subscriptions.remove(subscriptionForUnsub.id);
                return new ZMsg();
            }
            case NOTIFICATION_DATA: {
                URI endpointForNotificationContext;
                Subscription subscriptionForNotification = this.replyIdMap.get(reply.id);
                if (subscriptionForNotification == null) {
                    LOGGER.atInfo().addArgument((Object)reply.id).addArgument((Object)reply.toString()).log("received unsolicited subscription data for id '{}': {}");
                    return new ZMsg();
                }
                try {
                    endpointForNotificationContext = new ParsedEndpoint(subscriptionForNotification.endpoint, reply.dataContext.cycleName).toURI();
                }
                catch (CmwLightProtocol.RdaLightException | URISyntaxException e) {
                    LOGGER.atWarn().setCause((Throwable)e).log("Error generating reply context URI");
                    return new ZMsg();
                }
                return OpenCmwDataSource.createInternalMsg(subscriptionForNotification.idString.getBytes(StandardCharsets.UTF_8), endpointForNotificationContext, reply.bodyData, null);
            }
            case NOTIFICATION_EXC: {
                Subscription subscriptionForNotifyExc = this.replyIdMap.get(reply.id);
                if (subscriptionForNotifyExc == null) {
                    LOGGER.atInfo().addArgument((Object)reply.toString()).log("received unsolicited subscription notification error: {}");
                    return new ZMsg();
                }
                return OpenCmwDataSource.createInternalMsg(subscriptionForNotifyExc.idString.getBytes(StandardCharsets.UTF_8), subscriptionForNotifyExc.endpoint, null, reply.exceptionMessage.message);
            }
            case SUBSCRIBE_EXCEPTION: {
                Subscription subForSubExc = this.subscriptions.get(reply.id);
                subForSubExc.subscriptionState = SubscriptionState.UNSUBSCRIBED;
                subForSubExc.timeoutValue = currentTime + (long)subForSubExc.backOff;
                subForSubExc.backOff *= 2;
                LOGGER.atDebug().addArgument((Object)subForSubExc.device).addArgument((Object)subForSubExc.property).log("exception during subscription, retrying: {}/{}");
                return OpenCmwDataSource.createInternalMsg(subForSubExc.idString.getBytes(StandardCharsets.UTF_8), subForSubExc.endpoint, null, reply.exceptionMessage.message);
            }
        }
        return new ZMsg();
    }

    private CmwLightMessage receiveData() {
        try {
            ZMsg data = ZMsg.recvMsg((ZMQ.Socket)this.socket, (int)1);
            if (data == null) {
                return null;
            }
            return CmwLightProtocol.parseMsg(data);
        }
        catch (CmwLightProtocol.RdaLightException e) {
            LOGGER.atDebug().setCause((Throwable)e).log("error parsing cmw light reply: ");
            return null;
        }
    }

    private boolean sendRequest(Request request) {
        try {
            ParsedEndpoint parsedEndpoint = new ParsedEndpoint(request.endpoint);
            switch (request.requestType) {
                case GET: {
                    return CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.getRequest(this.sessionId, request.id, parsedEndpoint.device, parsedEndpoint.property, new CmwLightMessage.RequestContext(parsedEndpoint.ctx, parsedEndpoint.filters, null)));
                }
                case SET: {
                    Objects.requireNonNull(request.data, "Data for set cannot be null");
                    return CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.setRequest(this.sessionId, request.id, parsedEndpoint.device, parsedEndpoint.property, new ZFrame(request.data), new CmwLightMessage.RequestContext(parsedEndpoint.ctx, parsedEndpoint.filters, null)));
                }
            }
            throw new CmwLightProtocol.RdaLightException("Message of unknown type");
        }
        catch (CmwLightProtocol.RdaLightException e) {
            LOGGER.atDebug().setCause((Throwable)e).log("Error sending get request:");
            return false;
        }
    }

    private void sendSubscribe(Subscription sub) {
        if (!sub.subscriptionState.equals((Object)SubscriptionState.UNSUBSCRIBED)) {
            return;
        }
        try {
            CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.subscribeRequest(this.sessionId, sub.id, sub.device, sub.property, Map.of(CmwLightProtocol.FieldName.SESSION_BODY_TAG.value(), Collections.emptyMap()), new CmwLightMessage.RequestContext(sub.selector, sub.filters, null), CmwLightProtocol.UpdateType.IMMEDIATE_UPDATE));
            sub.subscriptionState = SubscriptionState.SUBSCRIBING;
            sub.timeoutValue = System.currentTimeMillis() + this.subscriptionTimeout;
        }
        catch (CmwLightProtocol.RdaLightException e) {
            LOGGER.atDebug().setCause((Throwable)e).log("Error subscribing to property:");
            sub.timeoutValue = System.currentTimeMillis() + (long)sub.backOff;
            sub.backOff *= 2;
        }
    }

    private void sendUnsubscribe(Subscription sub) {
        try {
            CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.unsubscribeRequest(this.sessionId, sub.updateId, sub.device, sub.property, Map.of(CmwLightProtocol.FieldName.SESSION_BODY_TAG.value(), Collections.emptyMap()), CmwLightProtocol.UpdateType.IMMEDIATE_UPDATE));
            sub.subscriptionState = SubscriptionState.UNSUBSCRIBE_SENT;
        }
        catch (CmwLightProtocol.RdaLightException e) {
            throw new IllegalStateException("failed to unsubscribe from: '" + sub.property + "'", e);
        }
    }

    private void updateSubscription(long currentTime, Subscription sub) {
        switch (sub.subscriptionState) {
            case SUBSCRIBING: {
                if (currentTime <= sub.timeoutValue) break;
                sub.subscriptionState = SubscriptionState.UNSUBSCRIBED;
                sub.timeoutValue = currentTime + (long)sub.backOff;
                sub.backOff *= 2;
                LOGGER.atDebug().addArgument((Object)sub.device).addArgument((Object)sub.property).log("subscription timed out, retrying: {}/{}");
                break;
            }
            case UNSUBSCRIBED: {
                if (currentTime <= sub.timeoutValue) break;
                LOGGER.atDebug().addArgument((Object)sub.device).addArgument((Object)sub.property).log("subscribing {}/{}");
                this.sendSubscribe(sub);
                break;
            }
            case SUBSCRIBED: 
            case UNSUBSCRIBE_SENT: {
                break;
            }
            case CANCELED: {
                this.sendUnsubscribe(sub);
                break;
            }
            default: {
                throw new IllegalStateException("unexpected subscription state: " + sub.subscriptionState);
            }
        }
    }

    static {
        DataSource.register(FACTORY);
    }

    public static class ParsedEndpoint {
        public static final String DEFAULT_SELECTOR = "";
        public static final String FILTER_TYPE_LONG = "long:";
        public static final String FILTER_TYPE_INT = "int:";
        public static final String FILTER_TYPE_BOOL = "bool:";
        public static final String FILTER_TYPE_DOUBLE = "double:";
        public static final String FILTER_TYPE_FLOAT = "float:";
        public final Map<String, Object> filters;
        public final String ctx;
        public final String device;
        public final String property;
        public final String authority;

        public ParsedEndpoint(URI endpoint) throws CmwLightProtocol.RdaLightException {
            this(endpoint, null);
        }

        public ParsedEndpoint(URI endpoint, String ctx) throws CmwLightProtocol.RdaLightException {
            this.authority = Objects.requireNonNullElse(endpoint.getAuthority(), DEFAULT_SELECTOR).contains(":") ? endpoint.getAuthority() : null;
            this.device = OpenCmwConstants.getDeviceName((URI)endpoint);
            this.property = OpenCmwConstants.getPropertyName((URI)endpoint);
            if (this.property == null || this.property.isBlank() || this.property.contains("/")) {
                throw new CmwLightProtocol.RdaLightException("URI not compatible with rda3://<host>:<port>/<device>/<property> path scheme: " + endpoint + " detected property: " + this.property);
            }
            Map parsedQuery = QueryParameterParser.getFlatMap((String)endpoint.getQuery());
            if (ctx == null) {
                this.ctx = parsedQuery.containsKey("ctx") ? (String)parsedQuery.remove("ctx") : DEFAULT_SELECTOR;
            } else {
                this.ctx = ctx;
                parsedQuery.remove("ctx");
            }
            this.filters = parsedQuery.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
                String val = (String)e.getValue();
                if (val.startsWith(FILTER_TYPE_INT)) {
                    return Integer.valueOf(val.substring(FILTER_TYPE_INT.length()));
                }
                if (val.startsWith(FILTER_TYPE_LONG)) {
                    return Long.valueOf(val.substring(FILTER_TYPE_LONG.length()));
                }
                if (val.startsWith(FILTER_TYPE_BOOL)) {
                    return Boolean.valueOf(val.substring(FILTER_TYPE_BOOL.length()));
                }
                if (val.startsWith(FILTER_TYPE_DOUBLE)) {
                    return Double.valueOf(val.substring(FILTER_TYPE_DOUBLE.length()));
                }
                if (val.startsWith(FILTER_TYPE_FLOAT)) {
                    return Float.valueOf(val.substring(FILTER_TYPE_FLOAT.length()));
                }
                return val;
            }));
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof ParsedEndpoint)) {
                return false;
            }
            ParsedEndpoint that = (ParsedEndpoint)o;
            return this.filters.equals(that.filters) && this.ctx.equals(that.ctx) && this.device.equals(that.device) && this.property.equals(that.property);
        }

        public int hashCode() {
            return Objects.hash(this.filters, this.ctx, this.device, this.property);
        }

        public URI toURI() throws URISyntaxException {
            String filterString = this.filters.entrySet().stream().map(e -> {
                Object val;
                Object value = e.getValue();
                if (value instanceof String) {
                    val = (String)value;
                } else if (value instanceof Integer) {
                    val = FILTER_TYPE_INT + value;
                } else if (value instanceof Long) {
                    val = FILTER_TYPE_LONG + value;
                } else if (value instanceof Boolean) {
                    val = FILTER_TYPE_BOOL + value;
                } else if (value instanceof Double) {
                    val = FILTER_TYPE_DOUBLE + value;
                } else if (value instanceof Float) {
                    val = FILTER_TYPE_FLOAT + value;
                } else {
                    throw new IllegalArgumentException("data type not supported in endpoint filters");
                }
                return (String)e.getKey() + "=" + (String)val;
            }).collect(Collectors.joining("&"));
            return new URI(CmwLightDataSource.RDA_3_PROTOCOL, this.authority, "/" + this.device + "/" + this.property, "ctx=" + this.ctx + "&" + filterString, null);
        }
    }

    public static class Request {
        public final byte[] data;
        public final long id;
        public final CmwLightProtocol.RequestType requestType;
        private final String requestId;
        private final URI endpoint;
        private final byte[] rbacToken;

        public Request(CmwLightProtocol.RequestType requestType, String requestId, URI endpoint, byte[] data, byte[] rbacToken) {
            this.requestType = requestType;
            this.id = REQUEST_ID_GENERATOR.incrementAndGet();
            this.requestId = requestId;
            this.endpoint = endpoint;
            this.data = data;
            this.rbacToken = rbacToken;
        }
    }

    public static class Subscription {
        public final String property;
        public final String device;
        public final String selector;
        public final Map<String, Object> filters;
        public final URI endpoint;
        private final long id = REQUEST_ID_GENERATOR.incrementAndGet();
        public SubscriptionState subscriptionState = SubscriptionState.UNSUBSCRIBED;
        public int backOff = 20;
        public long updateId = -1L;
        public long timeoutValue = -1L;
        public String idString = "";

        public Subscription(URI endpoint, String device, String property, String selector, Map<String, Object> filters) {
            this.endpoint = endpoint;
            this.property = property;
            this.device = device;
            this.selector = selector;
            this.filters = filters;
        }

        public String toString() {
            return "Subscription{property='" + this.property + "', device='" + this.device + "', selector='" + this.selector + "', filters=" + this.filters + ", subscriptionState=" + this.subscriptionState + ", backOff=" + this.backOff + ", id=" + this.id + ", updateId=" + this.updateId + ", timeoutValue=" + this.timeoutValue + "}";
        }
    }

    public static enum SubscriptionState {
        UNSUBSCRIBED,
        SUBSCRIBING,
        SUBSCRIBED,
        CANCELED,
        UNSUBSCRIBE_SENT;

    }
}

