/*
 * Decompiled with CFR 0.152.
 */
package hkube.algo.wrapper;

import hkube.algo.CommandResponseListener;
import hkube.algo.HKubeAPIImpl;
import hkube.algo.ICommandSender;
import hkube.algo.wrapper.DataAdapter;
import hkube.algo.wrapper.IAlgorithm;
import hkube.algo.wrapper.WrapperConfig;
import hkube.api.IHKubeAPI;
import hkube.caching.Cache;
import hkube.communication.DataServer;
import hkube.communication.IRequestListener;
import hkube.communication.IRequestServer;
import hkube.communication.zmq.ZMQServer;
import hkube.encoding.EncodingManager;
import hkube.model.HeaderContentPair;
import hkube.storage.IStorageConfig;
import hkube.storage.StorageFactory;
import hkube.storage.TaskStorage;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.glassfish.tyrus.client.ClientManager;
import org.json.JSONObject;

@ClientEndpoint
public class Wrapper
implements ICommandSender {
    private final WrapperConfig mConfig;
    private static boolean isDebugMode = false;
    Session userSession = null;
    private IAlgorithm mAlgorithm;
    Map mArgs;
    List<CommandResponseListener> listeners = new ArrayList<CommandResponseListener>();
    HKubeAPIImpl hkubeAPI;
    IRequestServer zmqServer;
    DataServer dataServer;
    TaskStorage taskResultStorage;
    DataAdapter dataAdapter;
    EncodingManager workerEncoder;
    private static final Logger logger = LogManager.getLogger();

    public Wrapper(IAlgorithm algorithm, WrapperConfig config) {
        Cache.init((Integer)config.commConfig.getMaxCacheSize());
        this.mConfig = config;
        this.dataAdapter = new DataAdapter(this.mConfig);
        this.hkubeAPI = new HKubeAPIImpl(this, this.dataAdapter);
        this.zmqServer = !isDebugMode ? new ZMQServer(this.mConfig.commConfig) : this.createMockZMQServer();
        this.dataServer = new DataServer(this.zmqServer, this.mConfig.commConfig);
        this.mAlgorithm = algorithm;
        this.taskResultStorage = new StorageFactory((IStorageConfig)config.storageConfig).getTaskStorage();
        this.workerEncoder = new EncodingManager(this.mConfig.getEncodingType());
        this.connect();
    }

    public static void setDebugMode() {
        isDebugMode = true;
    }

    @Override
    public void addResponseListener(CommandResponseListener listener) {
        this.listeners.add(listener);
    }

    private void connect() {
        String uriString = this.mConfig.getUrl();
        uriString = uriString == null ? "ws://" + this.mConfig.getHost() + ":" + this.mConfig.getPort() + "/?storage=" + this.mConfig.getStorageVersion() + "&encoding=" + this.mConfig.getEncodingType() : uriString + "?encoding=" + this.mConfig.getEncodingType();
        try {
            logger.info("connecting to uri: " + uriString);
            URI uri = new URI(uriString);
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            ClientManager clientManager = (ClientManager)container;
            clientManager.getProperties().put("org.glassfish.tyrus.incomingBufferSize", 150000000);
            container.setAsyncSendTimeout(Long.MAX_VALUE);
            container.setDefaultMaxSessionIdleTimeout(Long.MAX_VALUE);
            container.setDefaultMaxBinaryMessageBufferSize(Integer.MAX_VALUE);
            container.setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
            while (this.userSession == null) {
                try {
                    container.connectToServer((Object)this, uri);
                }
                catch (Exception exc) {
                    logger.debug((Object)exc);
                }
                Thread.sleep(200L);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @OnOpen
    public void onOpen(Session userSession) {
        logger.info("connected to worker");
        this.userSession = userSession;
        this.userSession.setMaxIdleTimeout(Long.MAX_VALUE);
    }

    @OnClose
    public void onClose(Session userSession, CloseReason reason) {
        logger.info("websocket closed with reason :" + reason);
        this.userSession = null;
        this.mAlgorithm.Cleanup();
        System.exit(-1);
    }

    @OnError
    public void onError(Session userSession, Throwable error) {
        logger.error("closing websocket" + error);
    }

    @Override
    public void sendMessage(String command, Object data, boolean isError) {
        logger.info("Sending message to worker: " + command);
        HashMap<String, Object> toSend = new HashMap<String, Object>();
        toSend.put("command", command);
        if (isError) {
            toSend.put("error", data);
        } else {
            toSend.put("data", data);
        }
        JSONObject message = new JSONObject(toSend);
        if (this.workerEncoder.getName().equals("json")) {
            this.userSession.getAsyncRemote().sendText(message.toString());
        } else {
            HashMap<String, HashMap<String, Object>> root = new HashMap<String, HashMap<String, Object>>();
            root.put("data", toSend);
            byte[] bytes = this.workerEncoder.encodeNoHeader(root);
            ByteBuffer buffer = ByteBuffer.wrap(bytes);
            this.userSession.getAsyncRemote().sendBinary(buffer);
        }
    }

    @OnMessage
    public void onMessage(String message) {
        Map msgAsJson = (Map)this.workerEncoder.decodeNoHeader(message.getBytes());
        this.onMessage(msgAsJson);
    }

    @OnMessage
    public void onMessage(byte[] message) {
        Map msgAsJson = (Map)((Map)this.workerEncoder.decodeNoHeader(message)).get("data");
        this.onMessage(msgAsJson);
    }

    private void onMessage(Map msgAsMap) {
        try {
            String command = (String)msgAsMap.get("command");
            Map data = (Map)msgAsMap.get("data");
            this.listeners.forEach(listener -> {
                logger.debug("got command " + command);
                listener.onCommand(command, data, isDebugMode);
            });
            logger.info("got message from worker:" + command);
            CompletableFuture.supplyAsync(() -> {
                try {
                    switch (command) {
                        case "initialize": {
                            this.mArgs = data;
                            this.mAlgorithm.Init(this.mArgs);
                            this.sendMessage("initialized", null, false);
                            break;
                        }
                        case "exit": {
                            this.mAlgorithm.Cleanup();
                            this.sendMessage("exited", null, false);
                            System.exit(0);
                            break;
                        }
                        case "start": {
                            this.sendMessage("started", null, false);
                            try {
                                logger.debug("Before fetching input data");
                                Collection input = this.dataAdapter.placeData(this.mArgs);
                                this.mArgs.put("input", input);
                                logger.debug("After fetching input data");
                                if (logger.isDebugEnabled()) {
                                    logger.debug("input data after decoding " + input);
                                }
                                logger.debug("Before running algorithm");
                                Object res = this.mAlgorithm.Start(this.mArgs, (IHKubeAPI)this.hkubeAPI);
                                logger.debug("After running algorithm");
                                String taskId = (String)this.mArgs.get("taskId");
                                String jobId = (String)this.mArgs.get("jobId");
                                Collection savePaths = (Collection)((Map)this.mArgs.get("info")).get("savePaths");
                                Map metaData = DataAdapter.getMetadata(savePaths, res);
                                HeaderContentPair encodedData = this.dataAdapter.encode(res, this.mConfig.commConfig.getEncodingType());
                                boolean dataAdded = this.dataServer.addTaskData(taskId, encodedData);
                                int resEncodedSize = encodedData.getContent().length;
                                Map resultStoringInfo = this.dataAdapter.getStoringInfo(this.mConfig, jobId, taskId, metaData, resEncodedSize);
                                if (logger.isDebugEnabled()) {
                                    logger.debug("result storing data" + resultStoringInfo);
                                }
                                if (!isDebugMode) {
                                    if (dataAdded) {
                                        this.sendMessage("storing", resultStoringInfo, false);
                                        this.taskResultStorage.put((String)this.mArgs.get("jobId"), taskId, encodedData);
                                    } else {
                                        this.taskResultStorage.put((String)this.mArgs.get("jobId"), taskId, encodedData);
                                        this.sendMessage("storing", resultStoringInfo, false);
                                    }
                                    this.sendMessage("done", new HashMap(), false);
                                } else {
                                    this.sendMessage("done", res, false);
                                }
                                break;
                            }
                            catch (Exception ex) {
                                logger.error("unexpected exception", (Throwable)ex);
                                HashMap<String, String> res = new HashMap<String, String>();
                                res.put("code", "Failed");
                                res.put("message", ex.toString());
                                this.sendMessage("errorMessage", res, true);
                                break;
                            }
                            finally {
                                this.mArgs = new HashMap();
                            }
                        }
                        case "stop": {
                            this.mAlgorithm.Stop();
                            this.sendMessage("stopped", null, false);
                            break;
                        }
                        default: {
                            logger.info("got command: " + command);
                        }
                    }
                }
                catch (Exception exc) {
                    logger.error("unexpected exception", (Throwable)exc);
                    HashMap<String, String> res = new HashMap<String, String>();
                    res.put("code", "Failed");
                    res.put("message", exc.toString());
                    this.sendMessage("errorMessage", res, true);
                    logger.error((Object)exc);
                }
                return null;
            });
        }
        catch (Exception exc) {
            logger.error("unexpected exception", (Throwable)exc);
            HashMap<String, String> res = new HashMap<String, String>();
            res.put("code", "Failed");
            res.put("message", exc.toString());
            this.sendMessage("errorMessage", res, true);
            logger.error((Object)exc);
        }
    }

    private void onExit() {
        logger.warn("exiting");
    }

    private IRequestServer createMockZMQServer() {
        return new IRequestServer(){

            public void addRequestsListener(IRequestListener listener) {
            }

            public void reply(List<HeaderContentPair> replies) {
            }

            public void close() {
            }
        };
    }
}

