/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.client;

import io.opencmw.OpenCmwConstants;
import io.opencmw.OpenCmwProtocol;
import io.opencmw.client.DataSource;
import io.opencmw.client.DnsResolver;
import io.opencmw.filter.SubscriptionMatcher;
import io.opencmw.serialiser.IoSerialiser;
import io.opencmw.serialiser.spi.BinarySerialiser;
import io.opencmw.utils.NoDuplicatesList;
import io.opencmw.utils.SystemProperties;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMonitor;
import org.zeromq.ZMsg;

public class OpenCmwDataSource
extends DataSource
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(OpenCmwDataSource.class);
    private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
    private static final String MDP = "mdp";
    private static final String MDS = "mds";
    private static final String MDR = "mdr";
    private static final List<String> APPLICABLE_SCHEMES = List.of("mdp", "mds", "mdr");
    private static final List<DnsResolver> RESOLVERS = Collections.synchronizedList(new NoDuplicatesList());
    public static final DataSource.Factory FACTORY = new DataSource.Factory(){

        @Override
        public List<String> getApplicableSchemes() {
            return APPLICABLE_SCHEMES;
        }

        @Override
        public Class<? extends IoSerialiser> getMatchingSerialiserType(@NotNull URI endpoint) {
            return BinarySerialiser.class;
        }

        @Override
        public DataSource newInstance(ZContext context, @NotNull URI endpoint, @NotNull Duration timeout, @NotNull ExecutorService executorService, @NotNull String clientId) {
            return new OpenCmwDataSource(context, endpoint, timeout, executorService, clientId);
        }

        @Override
        public List<DnsResolver> getRegisteredDnsResolver() {
            return RESOLVERS;
        }
    };
    private final AtomicReference<ZMonitor.Event> connectionState = new AtomicReference<ZMonitor.Event>(ZMonitor.Event.CLOSED);
    private final AtomicInteger reconnectAttempt = new AtomicInteger(0);
    private final String sourceName;
    private final Duration timeout;
    private final ExecutorService executorService;
    private final String clientId;
    private final URI endpoint;
    private final ZContext context;
    private final ZMQ.Socket socket;
    private final ZMonitor socketMonitor;
    private final Map<String, URI> subscriptions = new HashMap<String, URI>();
    private final URI serverUri;
    private final BiPredicate<URI, URI> subscriptionMatcher = new SubscriptionMatcher(new Class[0]);
    private final long heartbeatInterval;
    private Future<URI> dnsWorkerResult;
    private long nextReconnectAttemptTimeStamp;
    private URI connectedAddress;

    public OpenCmwDataSource(@NotNull ZContext context, @NotNull URI endpoint, @NotNull Duration timeout, @NotNull ExecutorService executorService, String clientId) {
        super(endpoint);
        this.context = context;
        this.endpoint = Objects.requireNonNull(endpoint, "endpoint is null");
        this.timeout = timeout;
        this.executorService = executorService;
        this.clientId = clientId;
        this.sourceName = OpenCmwDataSource.class.getSimpleName() + "(ID: " + INSTANCE_COUNT.getAndIncrement() + ", endpoint: " + endpoint + ", clientId: " + clientId + ")";
        try {
            this.serverUri = new URI(endpoint.getScheme(), endpoint.getAuthority(), "/", null, null);
        }
        catch (URISyntaxException e) {
            LOGGER.atError().addArgument((Object)endpoint).setCause((Throwable)e).log("cannot create serverURI from endpoint: {}");
            throw new IllegalArgumentException("Invalid endpoint", e);
        }
        switch (endpoint.getScheme().toLowerCase(Locale.UK)) {
            case "mdp": {
                this.socket = context.createSocket(SocketType.DEALER);
                break;
            }
            case "mds": {
                this.socket = context.createSocket(SocketType.SUB);
                break;
            }
            case "mdr": {
                throw new UnsupportedOperationException("RADIO-DISH pattern is not yet implemented");
            }
            default: {
                throw new UnsupportedOperationException("Unsupported protocol type " + endpoint.getScheme());
            }
        }
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.socket);
        this.socketMonitor = new ZMonitor(context, this.socket);
        this.socketMonitor.add(new ZMonitor.Event[]{ZMonitor.Event.CLOSED, ZMonitor.Event.CONNECTED, ZMonitor.Event.DISCONNECTED});
        this.socketMonitor.start();
        long basicHeartBeat = SystemProperties.getValueIgnoreCase((String)"OpenCMW.heartBeat", (long)1000L);
        long clientTimeOut = SystemProperties.getValueIgnoreCase((String)"OpenCMW.clientTimeOut", (long)0L);
        this.heartbeatInterval = clientTimeOut == 0L ? Math.min(basicHeartBeat, timeout.toMillis()) : Math.min(Math.min(basicHeartBeat, timeout.toMillis()), TimeUnit.SECONDS.toMicros(clientTimeOut));
        this.nextReconnectAttemptTimeStamp = System.currentTimeMillis() + timeout.toMillis();
        URI reply = this.connect();
        if (reply == OpenCmwProtocol.EMPTY_URI) {
            LOGGER.atWarn().addArgument((Object)endpoint).addArgument((Object)this.sourceName).log("could not connect URI {} immediately - source {}");
        }
    }

    public final URI connect() {
        if (this.context.isClosed()) {
            LOGGER.atDebug().addArgument((Object)this.sourceName).log("ZContext closed for '{}'");
            return OpenCmwProtocol.EMPTY_URI;
        }
        this.connectionState.set(ZMonitor.Event.CONNECT_RETRIED);
        URI address = this.endpoint;
        if (address.getAuthority() == null) {
            Optional resolver = this.getFactory().getRegisteredDnsResolver().stream().findFirst();
            if (resolver.isEmpty()) {
                LOGGER.atWarn().addArgument((Object)this.endpoint).log("cannot resolve {} without a registered DNS resolver");
                return OpenCmwProtocol.EMPTY_URI;
            }
            try {
                address = new URI(address.getScheme(), null, "/" + OpenCmwConstants.getDeviceName((URI)address), null, null);
                Map<URI, List<URI>> candidates = ((DnsResolver)resolver.get()).resolveNames(List.of(address));
                if (Objects.requireNonNull(candidates.get(address), "candidates did not contain '" + address + "':" + candidates).isEmpty()) {
                    throw new UnknownHostException("DNS resolver could not resolve " + this.endpoint + " - unknown service - candidates" + candidates + " - " + address);
                }
                address = candidates.get(address).get(0);
            }
            catch (URISyntaxException | UnknownHostException e) {
                LOGGER.atWarn().addArgument((Object)address).addArgument((Object)e.getMessage()).log("cannot resolve {} - error message: {}");
                return OpenCmwProtocol.EMPTY_URI;
            }
        }
        switch (this.endpoint.getScheme().toLowerCase(Locale.UK)) {
            case "mdp": 
            case "mds": {
                address = OpenCmwConstants.replaceSchemeKeepOnlyAuthority((URI)address, (String)"tcp");
                if (!this.socket.connect(address.toString())) {
                    LOGGER.atError().addArgument((Object)address.toString()).log("could not connect to '{}'");
                    this.connectedAddress = OpenCmwProtocol.EMPTY_URI;
                    return OpenCmwProtocol.EMPTY_URI;
                }
                this.connectedAddress = address;
                this.connectionState.set(ZMonitor.Event.CONNECTED);
                return this.connectedAddress;
            }
            case "mdr": {
                throw new UnsupportedOperationException("RADIO-DISH pattern is not yet implemented");
            }
        }
        throw new UnsupportedOperationException("Unsupported protocol type " + this.endpoint.getScheme());
    }

    @Override
    public ZMQ.Socket getSocket() {
        return this.socket;
    }

    public URI reconnect() {
        URI result;
        LOGGER.atDebug().addArgument((Object)this.endpoint).addArgument((Object)this.sourceName).log("need to reconnect for URI {} - source {} ");
        if (this.connectedAddress != null) {
            this.socket.disconnect(this.connectedAddress.toString());
        }
        if ((result = this.connect()) == OpenCmwProtocol.EMPTY_URI) {
            LOGGER.atDebug().addArgument((Object)this.endpoint).addArgument((Object)this.sourceName).log("could not reconnect for URI '{}' - source {} ");
        }
        return result;
    }

    @Override
    public void close() throws IOException {
        this.socketMonitor.close();
        this.socket.close();
    }

    @Override
    public ZMsg getMessage() {
        OpenCmwProtocol.MdpMessage msg = OpenCmwProtocol.MdpMessage.receive((ZMQ.Socket)this.socket, (boolean)false);
        if (msg == null) {
            return null;
        }
        switch (msg.protocol) {
            case PROT_CLIENT: {
                return this.handleRequest(msg);
            }
        }
        LOGGER.atDebug().addArgument((Object)msg).log("Ignoring unexpected message: {}");
        return new ZMsg();
    }

    @Override
    public long housekeeping() {
        ZMonitor.ZEvent event;
        long now = System.currentTimeMillis();
        block9: while ((event = this.socketMonitor.nextEvent(false)) != null) {
            switch (event.type) {
                case DISCONNECTED: 
                case CLOSED: {
                    this.connectionState.set(ZMonitor.Event.CLOSED);
                    continue block9;
                }
                case CONNECTED: {
                    this.connectionState.set(ZMonitor.Event.CONNECTED);
                    continue block9;
                }
            }
            LOGGER.atDebug().addArgument((Object)event).log("received unknown event {}");
        }
        switch (this.connectionState.get()) {
            case CONNECTED: {
                this.reconnectAttempt.set(0);
                return now + this.heartbeatInterval;
            }
            case CONNECT_RETRIED: {
                if (now < this.nextReconnectAttemptTimeStamp) {
                    return this.nextReconnectAttemptTimeStamp;
                }
                if (this.dnsWorkerResult != null) {
                    this.dnsWorkerResult.cancel(true);
                }
                this.dnsWorkerResult = this.executorService.submit(this::reconnect);
                this.nextReconnectAttemptTimeStamp = this.reconnectAttempt.getAndIncrement() < SystemProperties.getValueIgnoreCase((String)"OpenCMW.reconnectThreshold1", (int)3) ? now + this.timeout.toMillis() : (this.reconnectAttempt.getAndIncrement() < SystemProperties.getValueIgnoreCase((String)"OpenCMW.reconnectThreshold2", (int)6) ? now + 10L * this.timeout.toMillis() : now + 100L * this.timeout.toMillis());
                return this.nextReconnectAttemptTimeStamp;
            }
            case CLOSED: {
                this.connectionState.compareAndSet(ZMonitor.Event.CLOSED, ZMonitor.Event.CONNECT_RETRIED);
                this.dnsWorkerResult = this.executorService.submit(this::reconnect);
                this.nextReconnectAttemptTimeStamp = now + this.timeout.toMillis();
                return this.nextReconnectAttemptTimeStamp;
            }
        }
        return now + this.heartbeatInterval;
    }

    @Override
    public void subscribe(String reqId, URI endpoint, byte[] rbacToken) {
        this.subscriptions.put(reqId, endpoint);
        byte[] serviceId = endpoint.getPath().substring(1).getBytes(StandardCharsets.UTF_8);
        if (this.socket.getSocketType() == SocketType.DEALER) {
            OpenCmwProtocol.MdpMessage msg = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.SUBSCRIBE, serviceId, reqId.getBytes(StandardCharsets.UTF_8), endpoint, OpenCmwProtocol.EMPTY_FRAME, "", rbacToken);
            if (!msg.send(this.socket)) {
                LOGGER.atError().addArgument((Object)reqId).addArgument((Object)endpoint).log("subscription error (reqId: {}) for endpoint: {}");
            }
        } else {
            String id = endpoint.getPath().substring(1) + "?" + endpoint.getQuery();
            this.socket.subscribe(id.getBytes(StandardCharsets.UTF_8));
        }
    }

    @Override
    public void unsubscribe(String reqId) {
        URI subscriptionEndpoint = this.subscriptions.remove(reqId);
        byte[] serviceId = subscriptionEndpoint.getPath().substring(1).getBytes(StandardCharsets.UTF_8);
        if (this.socket.getSocketType() == SocketType.DEALER) {
            OpenCmwProtocol.MdpMessage msg = new OpenCmwProtocol.MdpMessage(this.clientId.getBytes(StandardCharsets.UTF_8), OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.UNSUBSCRIBE, serviceId, reqId.getBytes(StandardCharsets.UTF_8), this.endpoint, OpenCmwProtocol.EMPTY_FRAME, "", null);
            if (!msg.send(this.socket)) {
                LOGGER.atError().addArgument((Object)reqId).addArgument((Object)this.endpoint).log("unsubscribe error (reqId: {}) for endpoint: {}");
            }
        } else {
            this.socket.unsubscribe(serviceId);
        }
    }

    @Override
    public void get(String requestId, URI endpoint, byte[] data, byte[] rbacToken) {
        byte[] serviceId = endpoint.getPath().substring(1).getBytes(StandardCharsets.UTF_8);
        OpenCmwProtocol.MdpMessage msg = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.GET_REQUEST, serviceId, requestId.getBytes(StandardCharsets.UTF_8), endpoint, OpenCmwProtocol.EMPTY_FRAME, "", rbacToken);
        if (!msg.send(this.socket)) {
            LOGGER.atError().addArgument((Object)requestId).addArgument((Object)endpoint).log("get error (reqId: {}) for endpoint: {}");
        }
    }

    @Override
    public void set(String requestId, URI endpoint, byte[] data, byte[] rbacToken) {
        byte[] serviceId = endpoint.getPath().substring(1).getBytes(StandardCharsets.UTF_8);
        OpenCmwProtocol.MdpMessage msg = new OpenCmwProtocol.MdpMessage(null, OpenCmwProtocol.MdpSubProtocol.PROT_CLIENT, OpenCmwProtocol.Command.SET_REQUEST, serviceId, requestId.getBytes(StandardCharsets.UTF_8), endpoint, data, "", rbacToken);
        if (!msg.send(this.socket)) {
            LOGGER.atError().addArgument((Object)requestId).addArgument((Object)endpoint).log("set error (reqId: {}) for endpoint: {}");
        }
    }

    public String toString() {
        return this.sourceName;
    }

    @Override
    protected DataSource.Factory getFactory() {
        return FACTORY;
    }

    private ZMsg handleRequest(OpenCmwProtocol.MdpMessage msg) {
        switch (msg.command) {
            case PARTIAL: 
            case FINAL: 
            case W_NOTIFY: {
                if (msg.clientRequestID != null && msg.clientRequestID.length > 0) {
                    return OpenCmwDataSource.createInternalMsg(msg.clientRequestID, msg.topic, new ZFrame(msg.data), msg.errors);
                }
                Optional<String> reqId = this.subscriptions.entrySet().stream().filter(e -> this.subscriptionMatcher.test(this.serverUri.relativize(msg.topic), this.serverUri.relativize((URI)e.getValue()))).map(Map.Entry::getKey).findFirst();
                if (reqId.isPresent()) {
                    return OpenCmwDataSource.createInternalMsg(reqId.get().getBytes(), msg.topic, new ZFrame(msg.data), msg.errors);
                }
                LOGGER.atWarn().addArgument((Object)msg.topic).log("Could not find subscription for notified request with endpoint: {}");
                return new ZMsg();
            }
        }
        LOGGER.atDebug().addArgument((Object)msg).log("Ignoring unexpected message: {}");
        return new ZMsg();
    }

    public static ZMsg createInternalMsg(byte[] reqId, URI endpoint, ZFrame body, String exception) {
        ZMsg result = new ZMsg();
        result.add(reqId);
        result.add(endpoint.toString());
        result.add(body == null ? new ZFrame(new byte[0]) : body);
        result.add(exception == null ? new ZFrame(new byte[0]) : new ZFrame(exception));
        return result;
    }

    static {
        DataSource.register(FACTORY);
    }
}

