/*
 * Decompiled with CFR 0.152.
 */
package io.lighty.core.cluster.kubernetes;

import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.lighty.core.cluster.ClusteringHandler;
import io.lighty.core.cluster.config.ClusteringConfigUtils;
import io.lighty.core.cluster.kubernetes.UnreachableListenerService;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesClusteringHandlerImpl
implements ClusteringHandler {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusteringHandlerImpl.class);
    public static final String K8S_DEFAULT_POD_NAMESPACE = "default";
    private final Config akkaDeploymentConfig;
    private final ActorSystemProvider actorSystemProvider;
    private Optional<String> moduleShardsConfig;

    public KubernetesClusteringHandlerImpl(@NonNull ActorSystemProvider actorSystemProvider, @NonNull Config akkaDeploymentConfig) {
        this.actorSystemProvider = actorSystemProvider;
        this.akkaDeploymentConfig = akkaDeploymentConfig;
        this.moduleShardsConfig = Optional.empty();
    }

    @Override
    @SuppressFBWarnings(value={"DMI_HARDCODED_ABSOLUTE_FILENAME"})
    public void initClustering() {
        LOG.info("Starting ClusterBootstrap");
        ClusterBootstrap clusterBootstrap = ClusterBootstrap.get((ActorSystem)this.actorSystemProvider.getActorSystem());
        clusterBootstrap.start();
        CountDownLatch latch = new CountDownLatch(1);
        try {
            LOG.info("Waiting for cluster to form");
            ListenableScheduledFuture clusterLeaderElectionFuture = this.getClusterLeaderElectionFuture(latch);
            latch.await();
            clusterLeaderElectionFuture.cancel(true);
        }
        catch (InterruptedException e) {
            LOG.error("Error occurred while waiting for the Cluster to form", (Throwable)e);
            return;
        }
        LOG.info("Cluster is formed, leader= {}", (Object)Cluster.get((ActorSystem)this.actorSystemProvider.getActorSystem()).state().getLeader());
        if (Cluster.get((ActorSystem)this.actorSystemProvider.getActorSystem()).selfAddress().equals((Object)Cluster.get((ActorSystem)this.actorSystemProvider.getActorSystem()).state().getLeader())) {
            LOG.info("I am leader, generating custom module-shards.conf");
            try {
                List memberRoles = this.akkaDeploymentConfig.getStringList("akka.cluster.roles");
                String data = ClusteringConfigUtils.generateModuleShardsForMembers(memberRoles);
                Files.write(Paths.get("/tmp/module-shards.conf", new String[0]), data.getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
                this.moduleShardsConfig = Optional.of("/tmp/module-shards.conf");
                return;
            }
            catch (IOException e) {
                LOG.info("Tmp module-shards.conf file was not created - error received {}", (Object)e.getMessage());
            }
        }
        LOG.info("Using default module-shards.conf");
    }

    @Override
    public void start(@NonNull ClusterSingletonServiceProvider clusterSingletonServiceProvider, @NonNull ClusterAdminService clusterAdminRPCService, @NonNull DataBroker bindingDataBroker) {
        String podSelector;
        String podNamespace;
        Optional<String> optPodNamespace;
        Long podRestartTimeout = null;
        if (this.akkaDeploymentConfig.hasPath("akka.lighty-kubernetes.pod-restart-timeout")) {
            podRestartTimeout = this.akkaDeploymentConfig.getLong("akka.lighty-kubernetes.pod-restart-timeout");
        }
        if ((optPodNamespace = ClusteringConfigUtils.getPodNamespaceFromConfig(this.akkaDeploymentConfig)).isPresent()) {
            podNamespace = optPodNamespace.get();
        } else {
            LOG.info("{} wasn't specified in .conf file, using k8s default value: {} ", (Object)"akka.discovery.kubernetes-api.pod-namespace", (Object)K8S_DEFAULT_POD_NAMESPACE);
            podNamespace = K8S_DEFAULT_POD_NAMESPACE;
        }
        Optional<String> optPodSelector = ClusteringConfigUtils.getPodSelectorFromConfig(this.akkaDeploymentConfig);
        if (optPodSelector.isPresent()) {
            podSelector = optPodSelector.get();
        } else {
            String defaultPodSelector = this.actorSystemProvider.getActorSystem().name();
            LOG.warn("{} wasn't specified in .conf file, using k8s default value (akka actor system name): {} Make sure that the value match the deployment label selector", (Object)"akka.discovery.kubernetes-api.pod-label-selector", (Object)defaultPodSelector);
            podSelector = defaultPodSelector;
        }
        clusterSingletonServiceProvider.registerClusterSingletonService((ClusterSingletonService)new UnreachableListenerService(this.actorSystemProvider.getActorSystem(), bindingDataBroker, clusterAdminRPCService, podNamespace, podSelector, podRestartTimeout));
        this.askForShards(clusterAdminRPCService);
    }

    @Override
    public Optional<String> getModuleConfig() {
        return this.moduleShardsConfig;
    }

    private void askForShards(ClusterAdminService clusterAdminRPCService) {
        if (!Cluster.get((ActorSystem)this.actorSystemProvider.getActorSystem()).selfAddress().equals((Object)Cluster.get((ActorSystem)this.actorSystemProvider.getActorSystem()).state().getLeader())) {
            LOG.debug("RPC call - Asking for Shard Snapshots");
            try {
                RpcResult rpcResult = (RpcResult)clusterAdminRPCService.addReplicasForAllShards(new AddReplicasForAllShardsInputBuilder().build()).get();
                LOG.debug("RPC call - Asking for Shard Snapshots result: {}", rpcResult.getResult());
            }
            catch (InterruptedException | ExecutionException e) {
                LOG.error("RPC call - Asking for Shard Snapshots failed", (Throwable)e);
            }
        }
    }

    private ListenableScheduledFuture getClusterLeaderElectionFuture(CountDownLatch latch) {
        ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor());
        return listeningScheduledExecutorService.scheduleAtFixedRate(() -> {
            if (Cluster.get((ActorSystem)this.actorSystemProvider.getActorSystem()).state().getLeader() != null) {
                latch.countDown();
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }
}

