package org.apache.airavata.gfac.server;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.AiravataStartupException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.utils.ZKPaths;
import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/gfac/server/GfacServerHandler.class */
public class GfacServerHandler implements GfacService.Iface {
    private RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer;
    private ExperimentCatalog experimentCatalog;
    private AppCatalog appCatalog;
    private String airavataUserName;
    private CuratorFramework curatorClient;
    private Publisher statusPublisher;
    private String airavataServerHostPort;
    private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
    private ExecutorService executorService;
    private static final Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
    private static int requestCount = 0;
    private static List<AbstractActivityListener> activityListeners = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/airavata/gfac/server/GfacServerHandler$ProcessLaunchMessageHandler.class */
    public class ProcessLaunchMessageHandler implements MessageHandler {
        private String experimentNode = "/experiments";
        private String gfacServerName = ServerSettings.getGFacServerName();

        public ProcessLaunchMessageHandler() throws ApplicationSettingsException {
        }

        public Map<String, Object> getProperties() {
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            arrayList.add(ServerSettings.getLaunchQueueName());
            arrayList.add(ServerSettings.getCancelQueueName());
            hashMap.put("routingKey", arrayList);
            hashMap.put("queue", ServerSettings.getLaunchQueueName());
            return hashMap;
        }

        public void onMessage(MessageContext messageContext) {
            GfacServerHandler.log.info(" Message Received with message id '" + messageContext.getMessageId() + "' and with message type '" + messageContext.getType());
            if (!messageContext.getType().equals(MessageType.LAUNCHPROCESS)) {
                if (messageContext.getType().equals(MessageType.TERMINATEPROCESS)) {
                    ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent();
                    try {
                        try {
                            ThriftUtils.createThriftFromBytes(ThriftUtils.serializeThriftObject(messageContext.getEvent()), processTerminateEvent);
                            if (GFacUtils.setExperimentCancelRequest(processTerminateEvent.getProcessId(), GfacServerHandler.this.curatorClient, messageContext.getDeliveryTag())) {
                                GfacServerHandler.log.info("processId:{} - Process cancel request save successfully", processTerminateEvent.getProcessId());
                            }
                            try {
                                if (!GfacServerHandler.this.rabbitMQProcessLaunchConsumer.isOpen()) {
                                    GfacServerHandler.this.rabbitMQProcessLaunchConsumer.reconnect();
                                }
                                GfacServerHandler.this.rabbitMQProcessLaunchConsumer.sendAck(messageContext.getDeliveryTag());
                                return;
                            } catch (AiravataException e) {
                                GfacServerHandler.log.error("processId: " + processTerminateEvent.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e);
                                return;
                            }
                        } catch (Throwable th) {
                            try {
                                if (!GfacServerHandler.this.rabbitMQProcessLaunchConsumer.isOpen()) {
                                    GfacServerHandler.this.rabbitMQProcessLaunchConsumer.reconnect();
                                }
                                GfacServerHandler.this.rabbitMQProcessLaunchConsumer.sendAck(messageContext.getDeliveryTag());
                            } catch (AiravataException e2) {
                                GfacServerHandler.log.error("processId: " + processTerminateEvent.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e2);
                            }
                            throw th;
                        }
                    } catch (Exception e3) {
                        GfacServerHandler.log.error("processId:" + processTerminateEvent.getProcessId() + " - Process cancel reqeust failed", e3);
                        try {
                            if (!GfacServerHandler.this.rabbitMQProcessLaunchConsumer.isOpen()) {
                                GfacServerHandler.this.rabbitMQProcessLaunchConsumer.reconnect();
                            }
                            GfacServerHandler.this.rabbitMQProcessLaunchConsumer.sendAck(messageContext.getDeliveryTag());
                            return;
                        } catch (AiravataException e4) {
                            GfacServerHandler.log.error("processId: " + processTerminateEvent.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e4);
                            return;
                        }
                    }
                }
                return;
            }
            ProcessStatus processStatus = new ProcessStatus();
            processStatus.setState(ProcessState.STARTED);
            try {
                ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
                ThriftUtils.createThriftFromBytes(ThriftUtils.serializeThriftObject(messageContext.getEvent()), processSubmitEvent);
                if (messageContext.isRedeliver()) {
                    if (Factory.getGfacContext().getProcess(processSubmitEvent.getProcessId()) != null) {
                        try {
                            GfacServerHandler.this.updateDeliveryTag(GfacServerHandler.this.curatorClient, this.gfacServerName, processSubmitEvent, messageContext);
                            return;
                        } catch (Exception e5) {
                            GfacServerHandler.log.error("Error while updating delivery tag for redelivery message , messageId : " + messageContext.getMessageId(), e5);
                            GfacServerHandler.this.rabbitMQProcessLaunchConsumer.sendAck(messageContext.getDeliveryTag());
                        }
                    } else {
                        processStatus.setState(((ProcessStatus) Factory.getDefaultExpCatalog().get(ExperimentCatalogModelType.PROCESS_STATUS, processSubmitEvent.getProcessId())).getState());
                    }
                }
                processStatus.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
                Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, processStatus, processSubmitEvent.getProcessId());
                GfacServerHandler.this.publishProcessStatus(processSubmitEvent, processStatus);
                try {
                    GfacServerHandler.this.createProcessZKNode(GfacServerHandler.this.curatorClient, this.gfacServerName, processSubmitEvent, messageContext);
                    if (GfacServerHandler.this.setCancelWatcher(GfacServerHandler.this.curatorClient, processSubmitEvent.getExperimentId(), processSubmitEvent.getProcessId())) {
                        if (processStatus.getState() == ProcessState.STARTED) {
                            processStatus.setState(ProcessState.CANCELLING);
                            processStatus.setReason("Process Cancel is triggered");
                            processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                            Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, processStatus, processSubmitEvent.getProcessId());
                            GfacServerHandler.this.publishProcessStatus(processSubmitEvent, processStatus);
                            processStatus.setState(ProcessState.CANCELED);
                            processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                            Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, processStatus, processSubmitEvent.getProcessId());
                            GfacServerHandler.this.publishProcessStatus(processSubmitEvent, processStatus);
                            GfacServerHandler.this.rabbitMQProcessLaunchConsumer.sendAck(messageContext.getDeliveryTag());
                            return;
                        }
                        GfacServerHandler.this.setCancelData(processSubmitEvent.getExperimentId(), processSubmitEvent.getProcessId());
                    }
                    GfacServerHandler.this.submitProcess(processSubmitEvent.getProcessId(), processSubmitEvent.getGatewayId(), processSubmitEvent.getTokenId());
                } catch (Exception e6) {
                    GfacServerHandler.log.error(e6.getMessage(), e6);
                    GfacServerHandler.this.rabbitMQProcessLaunchConsumer.sendAck(messageContext.getDeliveryTag());
                }
            } catch (TException e7) {
                GfacServerHandler.log.error(e7.getMessage(), e7);
            } catch (AiravataException e8) {
                GfacServerHandler.log.error("Error while publishing process status", e8);
            } catch (RegistryException e9) {
                GfacServerHandler.log.error("Error while updating experiment status", e9);
            }
        }
    }

    public GfacServerHandler() throws AiravataStartupException {
        try {
            Factory.loadConfiguration();
            startCuratorClient();
            initZkDataStructure();
            initAMQPClient();
            this.executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
            startStatusUpdators(this.experimentCatalog, this.curatorClient, this.statusPublisher, this.rabbitMQProcessLaunchConsumer);
        } catch (Exception e) {
            throw new AiravataStartupException("Gfac Server Initialization error ", e);
        }
    }

    private void initAMQPClient() throws AiravataException {
        this.rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer();
        this.rabbitMQProcessLaunchConsumer.listen(new ProcessLaunchMessageHandler());
        this.statusPublisher = new RabbitMQStatusPublisher();
    }

    private void startCuratorClient() throws ApplicationSettingsException {
        this.curatorClient = Factory.getCuratorClient();
        this.curatorClient.start();
    }

    private void initZkDataStructure() throws Exception {
        this.airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
        ZKPaths.mkdirs(this.curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
        ZKPaths.mkdirs(this.curatorClient.getZookeeperClient().getZooKeeper(), "/experiments");
        String gFacServerName = ServerSettings.getGFacServerName();
        if (this.curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(), gFacServerName)) == null) {
            ((BackgroundPathAndBytesable) ((ACLBackgroundPathAndBytesable) this.curatorClient.create().withMode(CreateMode.EPHEMERAL)).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)).forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(), gFacServerName));
        }
        ((BackgroundPathAndBytesable) this.curatorClient.setData().withVersion(-1)).forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(), gFacServerName), this.airavataServerHostPort.getBytes());
    }

    public String getGFACServiceVersion() throws TException {
        return "0.16.0";
    }

    public boolean submitProcess(String str, String str2, String str3) throws TException {
        requestCount++;
        log.info("-----------------------------------" + requestCount + "-----------------------------------------");
        log.info(str, new Object[]{"GFac Received submit job request for the Process: {} process: {}", str, str});
        try {
            this.executorService.execute(new GFacWorker(str, str2, str3));
            return true;
        } catch (Exception e) {
            log.error("Error creating zookeeper nodes");
            return true;
        } catch (GFacException e2) {
            log.error("Failed to submit process", e2);
            return false;
        }
    }

    public boolean cancelProcess(String str, String str2, String str3) throws TException {
        return false;
    }

    public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorFramework, Publisher publisher, RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer) {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setCancelData(String str, String str2) throws Exception {
        String makePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath("/experiments", str), str2), "/cancelListener");
        log.info("expId: {}, processId: {}, set process cancel data to zookeeper node {}", new Object[]{str, str2, makePath});
        ((BackgroundPathAndBytesable) this.curatorClient.setData().withVersion(-1)).forPath(makePath, "CANCEL_REQUEST".getBytes());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean setCancelWatcher(CuratorFramework curatorFramework, String str, String str2) throws Exception {
        String makePath = ZKPaths.makePath(GFacUtils.getExperimentNodePath(str), "/cancelListener");
        byte[] bArr = (byte[]) curatorFramework.getData().forPath(makePath);
        if (bArr != null && new String(bArr).equalsIgnoreCase("CANCEL_REQUEST")) {
            return true;
        }
        byte[] bArr2 = (byte[]) ((BackgroundPathable) curatorFramework.getData().usingWatcher(Factory.getCancelRequestWatcher(str, str2))).forPath(makePath);
        return bArr2 != null && new String(bArr2).equalsIgnoreCase("CANCEL_REQUEST");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishProcessStatus(ProcessSubmitEvent processSubmitEvent, ProcessStatus processStatus) throws AiravataException {
        MessageContext messageContext = new MessageContext(new ProcessStatusChangeEvent(processStatus.getState(), new ProcessIdentifier(processSubmitEvent.getProcessId(), processSubmitEvent.getExperimentId(), processSubmitEvent.getGatewayId())), MessageType.PROCESS, AiravataUtils.getId(MessageType.PROCESS.name()), processSubmitEvent.getGatewayId());
        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
        this.statusPublisher.publish(messageContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createProcessZKNode(CuratorFramework curatorFramework, String str, ProcessSubmitEvent processSubmitEvent, MessageContext messageContext) throws Exception {
        String processId = processSubmitEvent.getProcessId();
        String tokenId = processSubmitEvent.getTokenId();
        String experimentId = processSubmitEvent.getExperimentId();
        long deliveryTag = messageContext.getDeliveryTag();
        String makePath = ZKPaths.makePath(GFacUtils.getExperimentNodePath(experimentId), processId);
        ZKPaths.mkdirs(curatorFramework.getZookeeperClient().getZooKeeper(), makePath);
        ((BackgroundPathAndBytesable) curatorFramework.setData().withVersion(-1)).forPath(makePath, str.getBytes());
        ((BackgroundPathable) curatorFramework.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId, processId))).forPath(makePath);
        ZKPaths.mkdirs(curatorFramework.getZookeeperClient().getZooKeeper(), ZKPaths.makePath(makePath, "/cancelListener"));
        String makePath2 = ZKPaths.makePath(makePath, "/deliveryTag");
        ZKPaths.mkdirs(curatorFramework.getZookeeperClient().getZooKeeper(), makePath2);
        ((BackgroundPathAndBytesable) curatorFramework.setData().withVersion(-1)).forPath(makePath2, GFacUtils.longToBytes(deliveryTag));
        String makePath3 = ZKPaths.makePath(makePath, "/token");
        ZKPaths.mkdirs(curatorFramework.getZookeeperClient().getZooKeeper(), makePath3);
        ((BackgroundPathAndBytesable) curatorFramework.setData().withVersion(-1)).forPath(makePath3, tokenId.getBytes());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateDeliveryTag(CuratorFramework curatorFramework, String str, ProcessSubmitEvent processSubmitEvent, MessageContext messageContext) throws Exception {
        String experimentId = processSubmitEvent.getExperimentId();
        String processId = processSubmitEvent.getProcessId();
        long deliveryTag = messageContext.getDeliveryTag();
        String makePath = ZKPaths.makePath(GFacUtils.getExperimentNodePath(experimentId), processId);
        if (((Stat) curatorFramework.checkExists().forPath(makePath)) != null) {
            ((BackgroundPathAndBytesable) curatorFramework.setData().withVersion(-1)).forPath(ZKPaths.makePath(makePath, "/deliveryTag"), GFacUtils.longToBytes(deliveryTag));
        }
    }
}
