/*
 * Decompiled with CFR 0.152.
 */
package io.parallec.core.actor;

import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.google.common.base.Strings;
import com.ning.http.client.AsyncHttpClient;
import io.parallec.core.ParallecResponseHandler;
import io.parallec.core.ParallelTask;
import io.parallec.core.RequestProtocol;
import io.parallec.core.ResponseOnSingleTask;
import io.parallec.core.actor.AssistantExecutionManager;
import io.parallec.core.actor.OperationWorker;
import io.parallec.core.actor.message.CancelTaskOnHostRequest;
import io.parallec.core.actor.message.InitialRequestToManager;
import io.parallec.core.actor.message.NodeReqResponse;
import io.parallec.core.actor.message.RequestToBatchSenderAsstManager;
import io.parallec.core.actor.message.ResponseCountToBatchSenderAsstManager;
import io.parallec.core.actor.message.ResponseFromManager;
import io.parallec.core.actor.message.type.ExecutionManagerMsgType;
import io.parallec.core.actor.message.type.OperationWorkerMsgType;
import io.parallec.core.bean.HttpMeta;
import io.parallec.core.bean.SingleTargetTaskStatus;
import io.parallec.core.bean.TargetHostMeta;
import io.parallec.core.bean.TaskRequest;
import io.parallec.core.bean.ping.PingMeta;
import io.parallec.core.bean.ssh.SshMeta;
import io.parallec.core.bean.tcp.TcpMeta;
import io.parallec.core.commander.workflow.InternalDataProvider;
import io.parallec.core.config.HandlerExecutionLocation;
import io.parallec.core.config.ParallecGlobalConfig;
import io.parallec.core.exception.ExecutionManagerExecutionException;
import io.parallec.core.resources.HttpMethod;
import io.parallec.core.task.ParallelTaskState;
import io.parallec.core.task.TaskErrorMeta;
import io.parallec.core.util.PcConstants;
import io.parallec.core.util.PcDateUtils;
import io.parallec.core.util.PcHttpUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class ExecutionManager
extends UntypedActor {
    private static Logger logger = LoggerFactory.getLogger(ExecutionManager.class);
    protected int responseCount = 0;
    protected int requestCount = 0;
    protected long startTime = System.currentTimeMillis();
    protected boolean wasIssuedCancel = false;
    protected long endTime = -1L;
    protected ActorRef director = null;
    protected final Map<String, ActorRef> workers = new LinkedHashMap<String, ActorRef>();
    protected ActorRef batchSenderAsstManager = null;
    protected final Map<String, ResponseOnSingleTask> responseMap = new HashMap<String, ResponseOnSingleTask>();
    protected String taskId = null;
    protected String taskIdTrim = null;
    protected Cancellable timeoutMessageCancellable = null;
    protected AsyncHttpClient asyncHttpClientGlobal = null;
    protected ParallelTask task = null;
    protected HttpMeta httpMeta = null;
    protected TargetHostMeta targetHostMeta = null;
    public static final int REDUCE_LEN = 12;

    public ExecutionManager(ParallelTask task) {
        this.task = task;
    }

    public void onReceive(Object message) {
        block29: {
            try {
                if (message instanceof InitialRequestToManager) {
                    this.director = this.getSender();
                    logger.info("parallec task state : " + ParallelTaskState.IN_PROGRESS.toString());
                    this.task.setState(ParallelTaskState.IN_PROGRESS);
                    this.task.setExecuteStartTime(this.startTime);
                    this.taskId = this.task.getTaskId();
                    this.taskIdTrim = this.taskId.length() <= 12 ? this.taskId : this.taskId.substring(this.taskId.length() - 12, this.taskId.length());
                    this.httpMeta = this.task.getHttpMeta();
                    this.targetHostMeta = this.task.getTargetHostMeta();
                    RequestProtocol requestProtocol = this.task.getRequestProtocol();
                    String requestUrlPrefixOrig = this.httpMeta.getRequestUrlPostfix();
                    HttpMethod httpMethod = this.httpMeta.getHttpMethod();
                    int requestPort = Integer.parseInt(this.httpMeta.getRequestPort());
                    boolean pollable = this.httpMeta.isPollable();
                    int maxConcurrency = this.task.getConcurrency();
                    Map<String, NodeReqResponse> nodeDataMapValid = this.task.getParallelTaskResult();
                    logger.info("Before Safety Check: total entry count: " + nodeDataMapValid.size());
                    HashMap<String, NodeReqResponse> nodeDataMapValidSafe = new HashMap<String, NodeReqResponse>();
                    InternalDataProvider.getInstance().filterUnsafeOrUnnecessaryRequest(nodeDataMapValid, nodeDataMapValidSafe);
                    logger.info("After Safety Check: total entry count in nodeDataMapValidSafe: {}", (Object)nodeDataMapValidSafe.size());
                    logger.debug("maxConcurrency : {}", (Object)maxConcurrency);
                    this.requestCount = nodeDataMapValidSafe.size();
                    this.task.setRequestNumActual(this.requestCount);
                    logger.info("!Obtain command request for target host meta id " + this.targetHostMeta.getTargetHostId() + "  with count: " + this.requestCount);
                    if (this.requestCount <= 0) {
                        this.getSender().tell((Object)new ResponseFromManager(this.requestCount), this.getSelf());
                        logger.info("req count <=0. return");
                        this.reply(ParallelTaskState.COMPLETED_WITHOUT_ERROR, new RuntimeException("ReqCount after trim is 0. Return."));
                        return;
                    }
                    int sentRequestCounter = 0;
                    AsyncHttpClient asyncHttpClient = this.asyncHttpClientGlobal = this.task.getAsyncHttpClient();
                    for (Map.Entry entry : nodeDataMapValidSafe.entrySet()) {
                        String targetHostNew;
                        String targetHost = (String)entry.getKey();
                        NodeReqResponse nodeReqResponse = (NodeReqResponse)entry.getValue();
                        String requestContentOrig = nodeReqResponse.getRequestParameters().get("PARALLEC_EMPTY_REQUEST_BODY");
                        String requestContent = NodeReqResponse.replaceStrByMap(nodeReqResponse.getRequestParameters(), requestContentOrig);
                        String resourcePath = NodeReqResponse.replaceStrByMap(nodeReqResponse.getRequestParameters(), requestUrlPrefixOrig);
                        ParallecResponseHandler handler = this.task.getConfig().getHandlerExecutionLocation() == HandlerExecutionLocation.MANAGER ? null : this.task.getHandler();
                        Map<String, Object> responseContext = this.task.getConfig().getHandlerExecutionLocation() == HandlerExecutionLocation.MANAGER ? null : this.task.getResponseContext();
                        HashMap<String, String> httpHeaderMapLocal = new HashMap<String, String>();
                        httpHeaderMapLocal.putAll(this.httpMeta.getHeaderMetadata().getHeaderMap());
                        PcHttpUtils.replaceHttpHeaderMapNodeSpecific(httpHeaderMapLocal, nodeReqResponse.getRequestParameters());
                        if (this.task.getConfig().isPrintHttpTrueHeaderMap()) {
                            for (Map.Entry headerEntry : httpHeaderMapLocal.entrySet()) {
                                String headerKey = (String)headerEntry.getKey();
                                String headerValue = (String)headerEntry.getValue();
                                nodeReqResponse.getRequestParameters().put(PcConstants.REQUEST_PARAMETER_HTTP_HEADER_PREFIX + headerKey, headerValue);
                            }
                        }
                        if (this.task.getConfig().isPrintPoller()) {
                            nodeReqResponse.getRequestParameters().put("NEED_POLLER", Boolean.toString(pollable));
                        }
                        if ((targetHostNew = nodeReqResponse.getRequestParameters().get("REPLACE-VAR_PARALLEC_UNIFORM_TARGET_HOST")) != null) {
                            nodeReqResponse.getRequestParameters().put("TRUE_TARGET_NODE", targetHostNew);
                        }
                        String hostUniform = targetHostNew == null ? null : targetHostNew;
                        nodeReqResponse.getRequestParameters().put("STATUS", SingleTargetTaskStatus.IN_PROGRESS.toString());
                        nodeReqResponse.getRequestParameters().put("TRUE_CONTENT", requestContent);
                        nodeReqResponse.getRequestParameters().put("TRUE_URL", resourcePath);
                        nodeReqResponse.getRequestParameters().put("TRUE_PORT", Integer.toString(requestPort));
                        nodeReqResponse.getRequestParameters().put("HEADER_META", this.httpMeta.getHeaderMetadata().getHeaderStr());
                        long prepareRequestTime = System.currentTimeMillis();
                        String prepareRequestTimeStr = PcDateUtils.getDateTimeStrStandard(new Date(prepareRequestTime));
                        nodeReqResponse.getRequestParameters().put("PREPARE_TIME", prepareRequestTimeStr);
                        SshMeta sshMeta = this.task.getSshMeta();
                        TcpMeta tcpMeta = this.task.getTcpMeta();
                        PingMeta pingMeta = this.task.getPingMeta();
                        logger.debug("REQUEST GENERATED: " + (sentRequestCounter + 1) + " / " + this.requestCount + " after " + new Double((double)(prepareRequestTime - this.startTime) / 1000.0).toString() + " secs" + ":  (NOT SEND YET) " + targetHost + " at " + prepareRequestTimeStr);
                        ActorRef worker = this.getContext().system().actorOf(Props.create(OperationWorker.class, (Object[])new Object[]{new TaskRequest(this.task.getConfig().getActorMaxOperationTimeoutSec(), requestProtocol, targetHost, hostUniform, requestPort, resourcePath, requestContent, httpMethod, pollable, httpHeaderMapLocal, handler, responseContext, sshMeta, tcpMeta, pingMeta), asyncHttpClient, this.task.getHttpMeta().getHttpPollerProcessor()}));
                        this.workers.put(targetHost, worker);
                        ++sentRequestCounter;
                    }
                    RequestToBatchSenderAsstManager requestToBatchSenderAsstManager = new RequestToBatchSenderAsstManager(this.taskId, this.task.getConfig().getAsstManagerRetryIntervalMillis(), new ArrayList<ActorRef>(this.workers.values()), this.getSelf(), maxConcurrency);
                    this.batchSenderAsstManager = this.getContext().system().actorOf(Props.create(AssistantExecutionManager.class, (Object[])new Object[0]), "RequestToBatchSenderAsstManager-" + UUID.randomUUID().toString());
                    this.batchSenderAsstManager.tell((Object)requestToBatchSenderAsstManager, this.getSelf());
                    FiniteDuration timeOutDuration = Duration.create((long)this.task.getConfig().getTimeoutInManagerSec(), (TimeUnit)TimeUnit.SECONDS);
                    this.timeoutMessageCancellable = this.getContext().system().scheduler().scheduleOnce(timeOutDuration, this.getSelf(), (Object)ExecutionManagerMsgType.OPERATION_TIMEOUT, (ExecutionContext)this.getContext().system().dispatcher(), this.getSelf());
                    logger.debug("Scheduled TIMEOUT_IN_MANAGER_SCONDS OPERATION_TIMEOUT after SEC {} ", (Object)this.task.getConfig().getTimeoutInManagerSec());
                    break block29;
                }
                if (message instanceof ResponseOnSingleTask) {
                    ResponseOnSingleTask taskResponse = (ResponseOnSingleTask)message;
                    ++this.responseCount;
                    this.task.setResponsedNum(this.responseCount);
                    ResponseCountToBatchSenderAsstManager responseCountToBatchSenderAsstManager = new ResponseCountToBatchSenderAsstManager(this.responseCount);
                    this.batchSenderAsstManager.tell((Object)responseCountToBatchSenderAsstManager, this.getSelf());
                    logger.debug("Send batchSenderAsstManager to responseCountToBatchSenderAsstManager: " + this.responseCount);
                    String hostName = taskResponse.getRequest().getHost();
                    if (this.responseMap.containsKey(hostName)) {
                        logger.error("ERROR: dupliated response received: " + hostName + PcDateUtils.getNowDateTimeStr());
                    }
                    this.responseMap.put(hostName, taskResponse);
                    String responseSummary = taskResponse.isError() ? "FAIL_GET_RESPONSE: " + taskResponse.getErrorMessage() : taskResponse.getStatusCode();
                    Map<String, LinkedHashSet<String>> resultMap = this.task.getAggregateResultMap();
                    if (resultMap.containsKey(responseSummary)) {
                        resultMap.get(responseSummary).add(hostName);
                    } else {
                        LinkedHashSet<String> set = new LinkedHashSet<String>();
                        set.add(hostName);
                        resultMap.put(responseSummary, set);
                    }
                    NodeReqResponse nrr = this.task.getParallelTaskResult().get(hostName);
                    nrr.setSingleTaskResponse(taskResponse);
                    String responseTrim = taskResponse.getResponseContent() == null ? null : taskResponse.getResponseContent().trim();
                    String displayResponse = Strings.isNullOrEmpty((String)responseTrim) ? "EMPTY" : responseTrim.substring(0, Math.min(PcConstants.AGNET_RESPONSE_MAX_RESPONSE_DISPLAY_BYTE, responseTrim.length()));
                    long responseReceiveTime = System.currentTimeMillis();
                    double progressPercent = (double)this.responseCount / (double)this.requestCount * 100.0;
                    String responseReceiveTimeStr = PcDateUtils.getDateTimeStrStandard(new Date(responseReceiveTime));
                    String secondElapsedStr = new Double((double)(responseReceiveTime - this.startTime) / 1000.0).toString();
                    if (this.requestCount < ParallecGlobalConfig.logAllResponseIfTotalLessThan || this.responseCount <= ParallecGlobalConfig.logAllResponseBeforeInitCount || progressPercent < ParallecGlobalConfig.logAllResponseBeforePercent || progressPercent > ParallecGlobalConfig.logAllResponseAfterPercent || this.responseCount % ParallecGlobalConfig.logResponseInterval == 0) {
                        logger.info(String.format("\n[%d]__RESP_RECV_IN_MGR %d (+%d) / %d (%.5g%%)  AFT %s S @ %s @ %s , TaskID : %s , CODE: %s, RESP_BRIEF: %s %s", this.responseCount, this.responseCount, this.requestCount - this.responseCount, this.requestCount, progressPercent, secondElapsedStr, hostName, responseReceiveTimeStr, this.taskIdTrim, taskResponse.getStatusCode(), displayResponse, taskResponse.getErrorMessage() == null ? "" : ", ERR: " + taskResponse.getErrorMessage()));
                    }
                    nrr.getRequestParameters().put("STATUS", SingleTargetTaskStatus.COMPLETED.toString());
                    if (this.task.getConfig().getHandlerExecutionLocation() == HandlerExecutionLocation.MANAGER && this.task != null && this.task.getHandler() != null) {
                        try {
                            this.task.getHandler().onCompleted(taskResponse, this.task.getResponseContext());
                        }
                        catch (Exception t) {
                            logger.error("Error handling onCompleted in manager for response: {} Error {}", (Object)taskResponse.toString(), (Object)t.getLocalizedMessage());
                        }
                    }
                    if (!this.task.getConfig().isSaveResponseToTask()) {
                        taskResponse.setResponseContent("NOT_SAVED");
                        logger.debug("Set single task response content as Not Saved to save space.");
                    }
                    if (this.responseCount == this.requestCount) {
                        if (this.wasIssuedCancel) {
                            ExecutionManagerExecutionException exCanceled = new ExecutionManagerExecutionException("ExecutionManager: task was canceled by user", ExecutionManagerExecutionException.ManagerExceptionType.CANCEL);
                            this.reply(ParallelTaskState.COMPLETED_WITH_ERROR, exCanceled);
                        } else {
                            this.reply(ParallelTaskState.COMPLETED_WITHOUT_ERROR, null);
                        }
                    }
                    break block29;
                }
                if (message instanceof CancelTaskOnHostRequest) {
                    CancelTaskOnHostRequest msg = (CancelTaskOnHostRequest)message;
                    this.cancelRequestAndWorkerOnHost(msg.getTargetHosts());
                } else if (message instanceof ExecutionManagerMsgType) {
                    switch ((ExecutionManagerMsgType)((Object)message)) {
                        case OPERATION_TIMEOUT: {
                            this.cancelRequestAndWorkers();
                            ExecutionManagerExecutionException ex = new ExecutionManagerExecutionException("Manager timeout after", ExecutionManagerExecutionException.ManagerExceptionType.TIMEOUT);
                            this.reply(ParallelTaskState.COMPLETED_WITH_ERROR, ex);
                            break;
                        }
                        case CANCEL: {
                            this.cancelRequestAndWorkers();
                            this.wasIssuedCancel = true;
                            break;
                        }
                    }
                } else {
                    logger.error("Unhandled: " + message);
                    this.unhandled(message);
                }
            }
            catch (Exception t) {
                logger.error("Command Manager error: " + t + " trace: ", (Throwable)t);
                this.task.getTaskErrorMetas().add(new TaskErrorMeta(TaskErrorMeta.TaskErrorType.COMMAND_MANAGER_ERROR, t == null ? "NA" : t.getLocalizedMessage()));
                this.reply(ParallelTaskState.COMPLETED_WITH_ERROR, t);
            }
        }
    }

    private void reply(ParallelTaskState state, Exception t) {
        this.task.setState(state);
        logger.info("task.state : " + this.task.getState().toString());
        logger.info("task.totalJobNumActual : " + this.task.getRequestNumActual() + " InitCount: " + this.task.getRequestNum());
        logger.info("task.response received Num {} ", (Object)this.task.getResponsedNum());
        if (state == ParallelTaskState.COMPLETED_WITH_ERROR) {
            this.task.getTaskErrorMetas().add(new TaskErrorMeta(TaskErrorMeta.TaskErrorType.COMMAND_MANAGER_ERROR, t == null ? "NA" : t.getLocalizedMessage()));
            logger.info("COMPLETED_WITH_ERROR.  " + this.requestCount + " at time: " + PcDateUtils.getNowDateTimeStrStandard());
        } else {
            logger.info("SUCCESSFUL GOT ON ALL RESPONSES: Received all the expected messages. Count matches: " + this.requestCount + " at time: " + PcDateUtils.getNowDateTimeStrStandard());
        }
        ResponseFromManager batchResponseFromManager = new ResponseFromManager(this.responseMap.size());
        this.responseMap.clear();
        this.director.tell((Object)batchResponseFromManager, this.getSelf());
        this.endTime = System.currentTimeMillis();
        this.task.setExecutionEndTime(this.endTime);
        double durationSec = (double)(this.endTime - this.startTime) / 1000.0;
        this.task.setDurationSec(durationSec);
        logger.info("\nTime taken to get all responses back : " + durationSec + " secs");
        this.task.setExecutionEndTime(this.endTime);
        for (ActorRef worker : this.workers.values()) {
            this.getContext().stop(worker);
        }
        this.workers.clear();
        if (this.batchSenderAsstManager != null && !this.batchSenderAsstManager.isTerminated()) {
            this.getContext().stop(this.batchSenderAsstManager);
        }
        if (this.timeoutMessageCancellable != null) {
            this.timeoutMessageCancellable.cancel();
        }
        if (this.getSelf() != null && !this.getSelf().isTerminated()) {
            this.getContext().stop(this.getSelf());
        }
    }

    private void cancelRequestAndWorkers() {
        for (ActorRef worker : this.workers.values()) {
            if (worker == null || worker.isTerminated()) continue;
            worker.tell((Object)OperationWorkerMsgType.CANCEL, this.getSelf());
        }
        logger.info("ExecutionManager sending cancelPendingRequest at time: " + PcDateUtils.getNowDateTimeStr());
    }

    private void cancelRequestAndWorkerOnHost(List<String> targetHosts) {
        ArrayList<String> validTargetHosts = new ArrayList<String>(this.workers.keySet());
        validTargetHosts.retainAll(targetHosts);
        logger.info("targetHosts for cancel: Total: {} Valid in current manager with worker threads: {}", (Object)targetHosts.size(), (Object)validTargetHosts.size());
        for (String targetHost : validTargetHosts) {
            ActorRef worker = this.workers.get(targetHost);
            if (worker != null && !worker.isTerminated()) {
                worker.tell((Object)OperationWorkerMsgType.CANCEL, this.getSelf());
                logger.info("Submitted CANCEL request on Host {}", (Object)targetHost);
                continue;
            }
            logger.info("Dit NOT Submitted CANCEL request on Host {} as worker on this host is null or already killed", (Object)targetHost);
        }
    }
}

