/*
 * Decompiled with CFR 0.152.
 */
package com.mware.core.model.yarn;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.mware.core.model.yarn.ClientBase;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;

public abstract class ApplicationMasterBase
implements AMRMClientAsync.CallbackHandler {
    private static BcLogger LOGGER;
    @Parameter(names={"-memory", "-mem"}, description="Memory for each process in MB.")
    private int memory = 512;
    @Parameter(names={"-cores"}, description="Number of virtual cores each process uses.")
    private int virtualCores = 1;
    @Parameter(names={"-instances", "-i"}, description="Number of instances to start.")
    private int instances = 1;
    @Parameter(names={"-appname"}, description="App name.")
    private String appName = null;
    @Parameter(names={"-remotepath"}, description="Path to the remote files.")
    private String remotePath = null;
    private NMClient nmClient;
    private FileSystem fs;
    private List<Path> resources;
    private String classPathEnv;
    private int numContainersToWaitFor;
    private Priority priority;
    private Resource capability;
    private AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;

    protected void run(String[] args) throws Exception {
        LOGGER = BcLoggerFactory.getLogger(ApplicationMasterBase.class);
        LOGGER.info("BEGIN " + this.getClass().getName(), new Object[0]);
        new JCommander((Object)this, args);
        LOGGER.info("memory: " + this.memory, new Object[0]);
        LOGGER.info("virtualCores: " + this.virtualCores, new Object[0]);
        LOGGER.info("instances: " + this.instances, new Object[0]);
        LOGGER.info("appName: " + this.appName, new Object[0]);
        LOGGER.info("remotePath: " + this.remotePath, new Object[0]);
        if (this.remotePath == null) {
            throw new Exception("remotePath is required");
        }
        ClientBase.printEnv();
        ClientBase.printSystemProperties();
        String myClasspath = System.getProperty("java.class.path");
        YarnConfiguration conf = new YarnConfiguration();
        conf.setBoolean("mapreduce.job.user.classpath.first", true);
        this.fs = FileSystem.get((Configuration)conf);
        this.resources = this.getResourceList(this.fs, new Path(this.remotePath));
        StringBuilder classPathEnvBuilder = new StringBuilder(myClasspath);
        for (Path p : this.resources) {
            classPathEnvBuilder.append(':');
            classPathEnvBuilder.append(p.getName());
        }
        LOGGER.info("Classpath: " + classPathEnvBuilder, new Object[0]);
        this.classPathEnv = classPathEnvBuilder.toString();
        this.nmClient = this.createNodeManagerClient(conf);
        this.rmClient = this.createResourceManagerClient(conf);
        this.rmClient.registerApplicationMaster("", 0, "");
        this.makeContainerRequests();
        LOGGER.info("[AM] waiting for containers to finish", new Object[0]);
        while (!this.doneWithContainers()) {
            Thread.sleep(100L);
        }
        this.rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
    }

    private boolean doneWithContainers() {
        return this.numContainersToWaitFor == 0;
    }

    private List<Path> getResourceList(FileSystem fs, Path remotePath) throws IOException {
        ArrayList<Path> resources = new ArrayList<Path>();
        RemoteIterator files = fs.listFiles(remotePath, false);
        while (files.hasNext()) {
            LocatedFileStatus file = (LocatedFileStatus)files.next();
            LOGGER.info("Adding local resource: " + file.getPath().toString(), new Object[0]);
            resources.add(file.getPath());
        }
        return resources;
    }

    private Map<String, LocalResource> createLocalResources(FileSystem fs, List<Path> resources) throws IOException {
        HashMap<String, LocalResource> localResources = new HashMap<String, LocalResource>();
        for (Path p : resources) {
            FileStatus fileStatus = fs.getFileStatus(p);
            LocalResource rsc = LocalResource.newInstance((URL)ConverterUtils.getYarnUrlFromURI((URI)p.toUri()), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.APPLICATION, (long)fileStatus.getLen(), (long)fileStatus.getModificationTime());
            localResources.put(p.getName(), rsc);
        }
        return localResources;
    }

    private void makeContainerRequests() {
        this.numContainersToWaitFor = this.instances;
        for (int i = 0; i < this.instances; ++i) {
            LOGGER.info("Making res-req " + i, new Object[0]);
            this.makeContainerRequest();
        }
    }

    private void makeContainerRequest() {
        this.ensureCreatePriorityRecord();
        this.ensureCreateResourceRecord();
        AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(this.capability, null, null, this.priority);
        this.rmClient.addContainerRequest(containerAsk);
    }

    private void ensureCreateResourceRecord() {
        if (this.capability == null) {
            this.capability = (Resource)Records.newRecord(Resource.class);
            this.capability.setMemory(this.memory);
            this.capability.setVirtualCores(this.virtualCores);
        }
    }

    private void ensureCreatePriorityRecord() {
        if (this.priority == null) {
            this.priority = (Priority)Records.newRecord(Priority.class);
            this.priority.setPriority(0);
        }
    }

    private NMClient createNodeManagerClient(YarnConfiguration conf) {
        NMClient nmClient = NMClient.createNMClient();
        nmClient.init((Configuration)conf);
        nmClient.start();
        return nmClient;
    }

    private AMRMClientAsync<AMRMClient.ContainerRequest> createResourceManagerClient(YarnConfiguration conf) throws IOException, YarnException {
        AMRMClientAsync rmClient = AMRMClientAsync.createAMRMClientAsync((int)100, (AMRMClientAsync.CallbackHandler)this);
        rmClient.init((Configuration)conf);
        rmClient.start();
        return rmClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onContainersCompleted(List<ContainerStatus> statuses) {
        for (ContainerStatus status : statuses) {
            int exitStatus = status.getExitStatus();
            LOGGER.info("[AM] Completed container " + status.getContainerId() + " (return code: " + exitStatus + ")", new Object[0]);
            if (exitStatus != 0) {
                LOGGER.info("[AM] Restarting failed process (return code: " + exitStatus + ")", new Object[0]);
                LOGGER.info("Diagnostics for process " + status.getContainerId() + ": " + status.getDiagnostics() + ", state: " + status.getState(), new Object[0]);
                this.makeContainerRequest();
                continue;
            }
            ApplicationMasterBase applicationMasterBase = this;
            synchronized (applicationMasterBase) {
                --this.numContainersToWaitFor;
            }
        }
    }

    public void onContainersAllocated(List<Container> containers) {
        for (Container container : containers) {
            try {
                this.launchContainer(container);
            }
            catch (Exception ex) {
                System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
            }
        }
    }

    private void launchContainer(Container container) throws YarnException, IOException {
        ContainerLaunchContext ctx = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        Map<String, LocalResource> localResources = this.createLocalResources(this.fs, this.resources);
        ctx.setLocalResources(localResources);
        String command = this.createCommand();
        LOGGER.info("Running: " + command, new Object[0]);
        ctx.setCommands(Collections.singletonList(command));
        ctx.getEnvironment().putAll(System.getenv());
        String message = String.format("Launching container %s (nodeId: %s nodeHttpAddress: %s)", container.getId(), container.getNodeId(), container.getNodeHttpAddress());
        LOGGER.info(message, new Object[0]);
        this.nmClient.startContainer(container, ctx);
    }

    protected String createCommand() {
        return "${JAVA_HOME}/bin/java -Xmx" + this.memory + "M -Djava.net.preferIPv4Stack=true -cp " + this.classPathEnv + " " + this.getTaskClass().getName() + " 1>" + "<LOG_DIR>" + "/stdout 2>" + "<LOG_DIR>" + "/stderr";
    }

    public int getMemory() {
        return this.memory;
    }

    public String getClassPathEnv() {
        return this.classPathEnv;
    }

    protected abstract Class getTaskClass();

    public void onShutdownRequest() {
    }

    public void onNodesUpdated(List<NodeReport> updatedNodes) {
    }

    public float getProgress() {
        return 0.0f;
    }

    public void onError(Throwable e) {
        LOGGER.error("[AM] error ", e);
    }
}

