/*
 * Decompiled with CFR 0.152.
 */
package co.cask.tigon.cli;

import co.cask.tigon.api.flow.FlowSpecification;
import co.cask.tigon.app.program.Program;
import co.cask.tigon.app.program.Programs;
import co.cask.tigon.cli.FlowOperations;
import co.cask.tigon.data.transaction.queue.QueueAdmin;
import co.cask.tigon.flow.DeployClient;
import co.cask.tigon.internal.app.runtime.distributed.DistributedFlowletInstanceUpdater;
import co.cask.tigon.internal.app.runtime.distributed.FlowTwillProgramController;
import co.cask.tigon.internal.app.runtime.flow.FlowUtils;
import co.cask.tigon.io.Locations;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.io.OutputSupplier;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedFlowOperations
extends AbstractIdleService
implements FlowOperations {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedFlowOperations.class);
    private final Location location;
    private final TwillRunnerService runnerService;
    private final DeployClient deployClient;
    private final File jarUnpackDir;
    private final QueueAdmin queueAdmin;

    @Inject
    public DistributedFlowOperations(LocationFactory locationFactory, TwillRunnerService runnerService, DeployClient deployClient, QueueAdmin queueAdmin) throws IOException {
        this.location = locationFactory.create("flowjars");
        this.location.mkdirs();
        this.runnerService = runnerService;
        this.deployClient = deployClient;
        this.jarUnpackDir = Files.createTempDir();
        this.queueAdmin = queueAdmin;
    }

    protected void startUp() throws Exception {
        this.runnerService.startAndWait();
    }

    @Override
    public void startFlow(File jarPath, String className, Map<String, String> userArgs) {
        try {
            Program program = this.deployClient.createProgram(jarPath, className, this.jarUnpackDir);
            String flowName = program.getSpecification().getName();
            if (this.listAllFlows().contains(flowName)) {
                throw new Exception("Flow with the same name is running! Stop or Delete the Flow before starting again");
            }
            Location jarInHDFS = this.location.append(flowName);
            jarInHDFS.delete();
            jarInHDFS.createNew();
            ByteStreams.copy((InputSupplier)Locations.newInputSupplier((Location)program.getJarLocation()), (OutputSupplier)Locations.newOutputSupplier((Location)jarInHDFS));
            this.deployClient.startFlow(program, userArgs);
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public Service.State getStatus(String flowName) {
        try {
            Iterable<TwillController> controllers = this.lookupFlow(flowName);
            this.sleepForZK(controllers);
            if (controllers.iterator().hasNext()) {
                Service.State state = controllers.iterator().next().state();
                this.sleepForZK(state);
                return state;
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
        return null;
    }

    @Override
    public List<String> listAllFlows() {
        ArrayList flowNames = Lists.newArrayList();
        try {
            for (TwillRunner.LiveInfo liveInfo : this.lookupService()) {
                String appName = liveInfo.getApplicationName();
                appName = appName.substring(appName.indexOf(46) + 1);
                flowNames.add(appName);
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
        return flowNames;
    }

    @Override
    public List<InetSocketAddress> discover(String flowName, String service) {
        ArrayList endPoints = Lists.newArrayList();
        try {
            Iterable<TwillController> controllers = this.lookupFlow(flowName);
            for (TwillController controller : controllers) {
                ServiceDiscovered iterable = controller.discoverService(service);
                this.sleepForZK(iterable);
                for (Discoverable discoverable : iterable) {
                    endPoints.add(discoverable.getSocketAddress());
                }
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
        return endPoints;
    }

    @Override
    public void stopFlow(String flowName) {
        try {
            Iterable<TwillController> controllers = this.lookupFlow(flowName);
            for (TwillController controller : controllers) {
                controller.stopAndWait();
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void deleteFlow(String flowName) {
        this.stopFlow(flowName);
        try {
            this.queueAdmin.clearAllForFlow(flowName, flowName);
            Location jarinHDFS = this.location.append(flowName);
            jarinHDFS.delete();
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public List<String> getServices(String flowName) {
        ArrayList services = Lists.newArrayList();
        try {
            Iterable<TwillController> controllers = this.lookupFlow(flowName);
            for (TwillController controller : controllers) {
                ResourceReport report = controller.getResourceReport();
                this.sleepForZK(report);
                services.addAll(report.getServices());
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
        return services;
    }

    @Override
    public void setInstances(String flowName, String flowletName, int instanceCount) {
        try {
            Iterable<TwillController> controllers = this.lookupFlow(flowName);
            for (TwillController controller : controllers) {
                ResourceReport report = controller.getResourceReport();
                this.sleepForZK(report);
                int oldInstances = ((Collection)report.getResources().get(flowletName)).size();
                Program program = Programs.create((Location)this.location.append(flowName));
                Multimap consumerQueues = FlowUtils.configureQueue((Program)program, (FlowSpecification)program.getSpecification(), (QueueAdmin)this.queueAdmin);
                DistributedFlowletInstanceUpdater instanceUpdater = new DistributedFlowletInstanceUpdater(program, controller, this.queueAdmin, consumerQueues);
                FlowTwillProgramController flowController = new FlowTwillProgramController(program.getName(), controller, instanceUpdater);
                HashMap instanceOptions = Maps.newHashMap();
                instanceOptions.put("flowlet", flowletName);
                instanceOptions.put("newInstances", String.valueOf(instanceCount));
                instanceOptions.put("oldInstances", String.valueOf(oldInstances));
                flowController.command("flowletInstances", (Object)instanceOptions).get();
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public Map<String, Integer> getFlowInfo(String flowName) {
        HashMap flowletInfo = Maps.newHashMap();
        try {
            Iterable<TwillController> controllers = this.lookupFlow(flowName);
            for (TwillController controller : controllers) {
                ResourceReport report = controller.getResourceReport();
                this.sleepForZK(report);
                for (Map.Entry entry : report.getResources().entrySet()) {
                    flowletInfo.put(entry.getKey(), ((Collection)entry.getValue()).size());
                }
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
        return flowletInfo;
    }

    @Override
    public void addLogHandler(String flowName, PrintStream out) {
        try {
            Iterable<TwillController> iterable = this.lookupFlow(flowName);
            if (iterable.iterator().hasNext()) {
                TwillController controller = iterable.iterator().next();
                controller.addLogHandler((LogHandler)new PrinterLogHandler(new PrintWriter(out, true)));
            }
        }
        catch (Exception e) {
            LOG.warn(e.getMessage(), (Throwable)e);
        }
    }

    private Iterable<TwillRunner.LiveInfo> lookupService() throws Exception {
        Iterable iterable = this.runnerService.lookupLive();
        this.sleepForZK(iterable);
        return iterable;
    }

    private Iterable<TwillController> lookupFlow(String flowName) throws Exception {
        Iterable iterable = this.runnerService.lookup(String.format("flow.%s", flowName));
        this.sleepForZK(iterable);
        return iterable;
    }

    private void sleepForZK(Object obj) throws Exception {
        int count = 100;
        try {
            for (int i = 0; i < count; ++i) {
                if (obj != null) {
                    TimeUnit.MILLISECONDS.sleep(250L);
                    return;
                }
                TimeUnit.MILLISECONDS.sleep(25L);
            }
            throw new Exception("Didn't receive data from ZK");
        }
        catch (InterruptedException e) {
            LOG.warn("Caught interrupted exception", (Throwable)e);
            Thread.currentThread().interrupt();
            return;
        }
    }

    protected void shutDown() throws Exception {
        this.runnerService.stopAndWait();
        FileUtils.deleteDirectory((File)this.jarUnpackDir);
    }
}

