/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.client.rest;

import io.opencmw.MimeType;
import io.opencmw.OpenCmwConstants;
import io.opencmw.OpenCmwProtocol;
import io.opencmw.client.DataSource;
import io.opencmw.client.DnsResolver;
import io.opencmw.serialiser.IoSerialiser;
import io.opencmw.serialiser.spi.JsonSerialiser;
import io.opencmw.utils.NoDuplicatesList;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class RestDataSource
extends DataSource
implements Runnable {
    private static final List<String> APPLICABLE_SCHEMES = List.of("http", "https");
    private static final List<DnsResolver> RESOLVERS = Collections.synchronizedList(new NoDuplicatesList());
    public static final DataSource.Factory FACTORY = new DataSource.Factory(){

        @Override
        public List<String> getApplicableSchemes() {
            return APPLICABLE_SCHEMES;
        }

        @Override
        public Class<? extends IoSerialiser> getMatchingSerialiserType(@NotNull URI endpoint) {
            return JsonSerialiser.class;
        }

        @Override
        public DataSource newInstance(ZContext context, @NotNull URI endpoint, @NotNull Duration timeout, @NotNull ExecutorService executorService, @NotNull String clientId) {
            return new RestDataSource(context, endpoint, timeout, clientId);
        }

        @Override
        public List<DnsResolver> getRegisteredDnsResolver() {
            return RESOLVERS;
        }
    };
    private static final Logger LOGGER = LoggerFactory.getLogger(RestDataSource.class);
    private static final int WAIT_TIMEOUT_MILLIS = 1000;
    private static final AtomicInteger REST_DATA_SOURCE_INSTANCE = new AtomicInteger();
    private static final int MAX_RETRIES = 3;
    private static final AtomicLong PUBLICATION_COUNTER = new AtomicLong();
    protected static OkHttpClient okClient;
    protected static EventSource.Factory eventSourceFactory;
    protected final AtomicBoolean run = new AtomicBoolean(true);
    protected final String uniqueID;
    protected final byte[] uniqueIdBytes;
    protected final URI endpoint;
    protected final Duration timeOut;
    protected final String clientID;
    protected int cancelLastCall;
    protected final ZContext ctxCopy;
    protected final Object newData = new Object();
    protected final Timer timer = new Timer();
    protected final List<RestCallBack> pendingCallbacks = Collections.synchronizedList(new ArrayList());
    protected final List<RestCallBack> completedCallbacks = Collections.synchronizedList(new ArrayList());
    protected final BlockingQueue<String> requestQueue = new LinkedBlockingDeque<String>();
    protected final Map<String, EventSource> sseSource = new HashMap<String, EventSource>();
    protected ZMQ.Socket internalSocket;
    protected ZMQ.Socket externalSocket;
    protected final TimerTask wakeupTask = new TimerTask(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = RestDataSource.this.newData;
            synchronized (object) {
                RestDataSource.this.newData.notifyAll();
            }
        }
    };

    protected RestDataSource(ZContext ctx, URI endpoint) {
        this(ctx, endpoint, Duration.ofMillis(0L), RestDataSource.class.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RestDataSource(ZContext ctx, URI endpoint, Duration timeOut, String clientID) {
        super(endpoint);
        Logger logger = LOGGER;
        synchronized (logger) {
            if (okClient == null) {
                okClient = new OkHttpClient();
                eventSourceFactory = EventSources.createFactory((OkHttpClient)okClient);
            }
        }
        if (timeOut == null) {
            throw new IllegalArgumentException("timeOut is null");
        }
        this.ctxCopy = ctx == null ? new ZContext() : ctx;
        this.endpoint = endpoint;
        this.timeOut = timeOut;
        this.clientID = clientID;
        this.uniqueID = clientID + "PID=" + ManagementFactory.getRuntimeMXBean().getName() + "-InstanceID=" + REST_DATA_SOURCE_INSTANCE.getAndIncrement();
        this.uniqueIdBytes = this.uniqueID.getBytes(ZMQ.CHARSET);
        if (timeOut.toMillis() > 0L) {
            this.timer.scheduleAtFixedRate(this.wakeupTask, 0L, timeOut.toMillis());
        }
        this.internalSocket = this.ctxCopy.createSocket(SocketType.PAIR);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.internalSocket);
        if (!this.internalSocket.setIdentity(this.uniqueIdBytes)) {
            throw new IllegalStateException("could not set identity on internalSocket");
        }
        if (!this.internalSocket.bind("inproc://" + this.uniqueID)) {
            throw new IllegalStateException("could not bind internalSocket to: inproc://" + this.uniqueID);
        }
        this.externalSocket = this.ctxCopy.createSocket(SocketType.PAIR);
        OpenCmwConstants.setDefaultSocketParameters((ZMQ.Socket)this.externalSocket);
        if (!this.externalSocket.connect("inproc://" + this.uniqueID)) {
            throw new IllegalStateException("could not bind externalSocket to: inproc://" + this.uniqueID);
        }
        LOGGER.atTrace().addArgument((Object)endpoint).log("connecting to REST endpoint: '{}'");
        this.start();
    }

    @Override
    public void get(String requestId, URI endpoint, byte[] data, byte[] rbacToken) {
        this.enqueueRequest(requestId);
    }

    @Override
    public void set(String requestId, URI endpoint, byte[] data, byte[] rbacToken) {
        throw new UnsupportedOperationException("set not (yet) implemented");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueueRequest(String hashKey) {
        if (!this.requestQueue.offer(hashKey)) {
            throw new IllegalStateException("could not add hashKey " + hashKey + " to request queue of endpoint " + this.endpoint);
        }
        Object object = this.newData;
        synchronized (object) {
            this.newData.notifyAll();
        }
    }

    @Override
    public void subscribe(String reqId, final URI endpoint, byte[] rbacToken) {
        try {
            Request request = new Request.Builder().url(endpoint.toURL()).build();
            this.sseSource.put(reqId, eventSourceFactory.newEventSource(request, new EventSourceListener(){

                public void onEvent(@NotNull EventSource eventSource, String id, String type, @NotNull String data) {
                    String pubKey = RestDataSource.this.clientID + "#" + PUBLICATION_COUNTER.getAndIncrement();
                    RestDataSource.this.getRequest(pubKey, endpoint.toString(), MimeType.TEXT);
                }
            }));
        }
        catch (MalformedURLException e) {
            LOGGER.atWarn().addArgument((Object)endpoint).setCause((Throwable)e).log("Error building uri for endpoint: {}");
        }
    }

    @Override
    public void unsubscribe(String reqId) {
        EventSource source = this.sseSource.remove(reqId);
        if (source != null) {
            source.cancel();
        }
    }

    public ZContext getCtx() {
        return this.ctxCopy;
    }

    @Override
    public ZMQ.Socket getSocket() {
        return this.externalSocket;
    }

    @Override
    public void close() {
        this.internalSocket.close();
        this.externalSocket.close();
    }

    @Override
    protected DataSource.Factory getFactory() {
        return FACTORY;
    }

    @Override
    public ZMsg getMessage() {
        return ZMsg.recvMsg((ZMQ.Socket)this.externalSocket, (boolean)false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long housekeeping() {
        Object object = this.newData;
        synchronized (object) {
            ArrayList<RestCallBack> temp = new ArrayList<RestCallBack>(this.pendingCallbacks);
            for (RestCallBack callBack : temp) {
                callBack.checkTimeOut();
            }
            try {
                while (!this.requestQueue.isEmpty()) {
                    String hash = this.requestQueue.take();
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.atTrace().addArgument((Object)hash).log("external request with hashKey = '{}'");
                    }
                    this.getRequest(hash, this.endpoint.toString(), MimeType.TEXT);
                }
            }
            catch (InterruptedException e) {
                LOGGER.atError().setCause((Throwable)e).addArgument((Object)this.endpoint).log("error in retrieving requestQueue items for endpoint: {}");
            }
        }
        return System.currentTimeMillis() + this.timeOut.toMillis();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        this.run.set(true);
        try {
            while (this.run.get() && !Thread.interrupted()) {
                Object object = this.newData;
                synchronized (object) {
                    if (this.completedCallbacks.isEmpty() && this.requestQueue.isEmpty()) {
                        long waitMax = this.timeOut.toMillis() <= 0L ? TimeUnit.MILLISECONDS.toMillis(1000L) : this.timeOut.toMillis();
                        this.newData.wait(waitMax);
                    }
                    for (RestCallBack callBack : this.completedCallbacks) {
                        byte[] data;
                        if (callBack.response == null) {
                            data = OpenCmwProtocol.EMPTY_FRAME;
                        } else {
                            data = callBack.response.peekBody(Long.MAX_VALUE).bytes();
                            callBack.response.close();
                        }
                        byte[] exception = callBack.exception == null ? OpenCmwProtocol.EMPTY_FRAME : callBack.exception.getMessage().getBytes(StandardCharsets.UTF_8);
                        ZMsg msg = new ZMsg();
                        msg.add(callBack.hashKey);
                        msg.add(callBack.endPointName);
                        msg.add(data);
                        msg.add(exception);
                        if (msg.send(this.internalSocket)) continue;
                        throw new IllegalStateException("internalSocket could not send message - error code: " + this.internalSocket.errno());
                    }
                    this.completedCallbacks.clear();
                    this.housekeeping();
                }
            }
        }
        catch (Exception e) {
            LOGGER.atError().setCause((Throwable)e).log("data acquisition loop abnormally terminated");
        }
        finally {
            this.externalSocket.close();
            this.internalSocket.close();
        }
        LOGGER.atTrace().addArgument((Object)this.uniqueID).addArgument((Object)this.run.get()).log("stop poller thread for uniqueID={} - run={}");
    }

    public void start() {
        new Thread(this).start();
    }

    public void stop() {
        for (String reqId : this.sseSource.keySet()) {
            this.unsubscribe(reqId);
        }
        this.run.set(false);
    }

    protected void getRequest(String hashKey, String path, MimeType mimeType) {
        Request request = new Request.Builder().url(path).get().addHeader("Accept", mimeType.toString()).build();
        if (LOGGER.isTraceEnabled()) {
            LOGGER.atTrace().addArgument((Object)this.endpoint).addArgument((Object)path).addArgument((Object)request).log("new request for {} - {} : request{}");
        }
        RestCallBack callBack = new RestCallBack(hashKey, path, mimeType);
        this.pendingCallbacks.add(callBack);
        Call call = okClient.newCall(request);
        call.enqueue((Callback)callBack);
        if (this.cancelLastCall > 0) {
            call.cancel();
            --this.cancelLastCall;
        }
    }

    public class RestCallBack
    implements Callback {
        private final String hashKey;
        private final String endPointName;
        private final MimeType mimeType;
        private final long requestTimeStamp = System.currentTimeMillis();
        private boolean active = true;
        private final AtomicInteger retryCount = new AtomicInteger();
        private final Lock lock = new ReentrantLock();
        private Response response;
        private Exception exception;

        public RestCallBack(String hashKey, String endPointName, MimeType mimeType) {
            this.hashKey = hashKey;
            this.endPointName = endPointName;
            this.mimeType = mimeType;
        }

        public String toString() {
            return "RestCallBack{hashKey='" + this.hashKey + "', endPointName='" + this.endPointName + "', requestTimeStamp=" + this.requestTimeStamp + ", active=" + this.active + ", retryCount=" + this.retryCount + ", result=" + this.response + ", exception=" + this.exception + "}";
        }

        public void checkTimeOut() {
            if (!this.active || RestDataSource.this.timeOut.toMillis() <= 0L) {
                return;
            }
            long now = System.currentTimeMillis();
            if (this.requestTimeStamp + RestDataSource.this.timeOut.toMillis() < now) {
                this.lock.lock();
                this.exception = new TimeoutException("ts=" + now + " - time-out of REST request for endpoint: " + RestDataSource.this.endpoint);
                this.notifyResult();
                this.lock.unlock();
            }
        }

        public void onFailure(@NotNull Call call, @NotNull IOException e) {
            if (!this.active) {
                return;
            }
            if (this.retryCount.incrementAndGet() <= 3) {
                this.lock.lock();
                this.exception = e;
                this.lock.unlock();
                LoggingEventBuilder logger = LOGGER.atWarn();
                if (LOGGER.isTraceEnabled()) {
                    logger.setCause((Throwable)e);
                }
                logger.addArgument((Object)this.retryCount.get()).addArgument((Object)3).addArgument((Object)RestDataSource.this.endpoint).log("retry {} of {}: could not connect/receive from endpoint {}");
                LockSupport.parkNanos(RestDataSource.this.timeOut.toMillis() * (1L << 2 * (this.retryCount.get() - 1)));
                Request request = new Request.Builder().url(this.endPointName).get().addHeader("Accept", this.mimeType.toString()).build();
                Call repeatedCall = okClient.newCall(request);
                repeatedCall.enqueue((Callback)this);
                if (RestDataSource.this.cancelLastCall > 0) {
                    repeatedCall.cancel();
                    --RestDataSource.this.cancelLastCall;
                }
                return;
            }
            LOGGER.atWarn().setCause((Throwable)e).addArgument((Object)3).addArgument((Object)RestDataSource.this.endpoint).log("failed after {} connect/receive retries - abort");
            this.lock.lock();
            this.exception = e;
            this.notifyResult();
            this.lock.unlock();
            LOGGER.atWarn().addArgument((Object)e.getLocalizedMessage()).log("RestCallBack-Failure: '{}'");
        }

        public void onResponse(@NotNull Call call, @NotNull Response response) {
            if (!this.active) {
                return;
            }
            this.lock.lock();
            this.response = response;
            this.notifyResult();
            this.lock.unlock();
            if (LOGGER.isTraceEnabled()) {
                LOGGER.atTrace().addArgument((Object)response).log("RestCallBack: '{}'");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyResult() {
            Object object = RestDataSource.this.newData;
            synchronized (object) {
                this.active = false;
                RestDataSource.this.pendingCallbacks.remove(this);
                RestDataSource.this.completedCallbacks.add(this);
                RestDataSource.this.newData.notifyAll();
            }
        }
    }
}

