/*
 * Decompiled with CFR 0.152.
 */
package io.lighty.core.cluster.kubernetes;

import akka.actor.AbstractActor;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.japi.Creator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.HostnameVerifier;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
import org.opendaylight.yangtools.yang.binding.Identifier;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnreachableListener
extends AbstractActor {
    private static final Logger LOG = LoggerFactory.getLogger(UnreachableListener.class);
    private static final String K8S_SCHEME = "https";
    private static final String K8S_HOST = "kubernetes";
    private static final String K8S_GET_PODS_PATH = "/api/v1/namespaces/default/pods";
    private static final String K8S_LIGHTY_SELECTOR = "lighty-k8s-cluster";
    private static final long DEFAULT_UNREACHABLE_RESTART_TIMEOUT = 30L;
    private final Cluster cluster = Cluster.get((ActorSystem)this.getContext().getSystem());
    private final ActorSystem actorSystem;
    private final DataBroker dataBroker;
    private final ClusterAdminService clusterAdminRPCService;
    private final Long podRestartTimeout;
    private final Set<Member> initialUnreachableSet;

    public UnreachableListener(ActorSystem actorSystem, DataBroker dataBroker, ClusterAdminService clusterAdminRPCService, Long podRestartTimeout) {
        LOG.info("UnreachableListener created");
        this.dataBroker = dataBroker;
        this.clusterAdminRPCService = clusterAdminRPCService;
        this.actorSystem = actorSystem;
        this.initialUnreachableSet = new HashSet<Member>();
        if (podRestartTimeout == null || podRestartTimeout == 0L) {
            this.podRestartTimeout = 30L;
            LOG.info("Pod-restart-timeout wasn't loaded from akka-config, using default:{}", (Object)this.podRestartTimeout);
        } else {
            this.podRestartTimeout = podRestartTimeout;
            LOG.info("Pod-restart-timeout value was loaded from akka-config:{}", (Object)this.podRestartTimeout);
        }
    }

    public static Props props(ActorSystem actorSystem, DataBroker dataBroker, ClusterAdminService clusterAdminRPCService, Long podRestartTimeout) {
        return Props.create(UnreachableListener.class, (Creator & Serializable)() -> new UnreachableListener(actorSystem, dataBroker, clusterAdminRPCService, podRestartTimeout));
    }

    public void preStart() {
        this.cluster.subscribe(this.getSelf(), (ClusterEvent.SubscriptionInitialStateMode)ClusterEvent.initialStateAsEvents(), new Class[]{ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class});
        this.initialUnreachableSet.addAll(this.cluster.state().getUnreachable());
        if (!this.initialUnreachableSet.isEmpty()) {
            for (Member member : this.initialUnreachableSet) {
                LOG.info("PreStart: Member detected as unreachable, preparing for downing: {}", (Object)member.address());
                this.processUnreachableMember(member);
            }
        }
    }

    public void postStop() {
        this.cluster.unsubscribe(this.getSelf());
    }

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ClusterEvent.UnreachableMember.class, mUnreachable -> {
            if (this.initialUnreachableSet.contains(mUnreachable.member())) {
                this.initialUnreachableSet.remove(mUnreachable.member());
                LOG.info("Member {} was already removed during PreStart.", (Object)mUnreachable.member().address());
                return;
            }
            LOG.info("Member detected as unreachable, processing: {}", (Object)mUnreachable.member().address());
            this.processUnreachableMember(mUnreachable.member());
        }).match(ClusterEvent.MemberRemoved.class, mRemoved -> LOG.info("Member was Removed: {}", (Object)mRemoved.member())).build();
    }

    private void processUnreachableMember(Member member) {
        ClusterEvent.CurrentClusterState currentState = this.cluster.state();
        if (this.isMajorityReachable(((Collection)currentState.getMembers()).size(), currentState.getUnreachable().size())) {
            if (!this.safeToDownMember(member)) {
                LOG.info("It is not safe to down member {}", (Object)member.address());
                return;
            }
            this.downMember(member);
            LOG.info("Downing complete");
        } else {
            LOG.warn("Majority of cluster seems to be unreachable. This is probably due to network partition in which case the other side will resolve it (since they are the majority). Downing members from this side isn't safe");
        }
    }

    private boolean isMajorityReachable(int totalMembers, int unreachableMembers) {
        return (double)(totalMembers - unreachableMembers) >= Math.floor((double)(totalMembers + 1) / 2.0) + (double)((totalMembers + 1) % 2);
    }

    private void downMember(Member member) {
        LOG.info("Downing member {}", (Object)member.address());
        List removedMemberRoles = member.getRoles().stream().filter(role -> !role.contains("default")).collect(Collectors.toList());
        this.cluster.down(member.address());
        WriteTransaction deleteTransaction = this.dataBroker.newWriteOnlyTransaction();
        for (InstanceIdentifier<Candidate> candidateIID : this.getCandidatesFromDatastore(member)) {
            LOG.debug("Deleting candidate: {}", candidateIID);
            deleteTransaction.delete(LogicalDatastoreType.OPERATIONAL, candidateIID);
        }
        try {
            for (String removedMemberRole : removedMemberRoles) {
                ListenableFuture rpcResultListenableFuture = this.clusterAdminRPCService.removeAllShardReplicas(new RemoveAllShardReplicasInputBuilder().setMemberName(removedMemberRole).build());
                RpcResult removeAllShardReplicasResult = (RpcResult)rpcResultListenableFuture.get();
                if (removeAllShardReplicasResult.isSuccessful()) {
                    LOG.debug("RPC RemoveAllShards for member {} executed successfully", (Object)removedMemberRole);
                    continue;
                }
                LOG.warn("RPC RemoveAllShards for member {} failed: {}", (Object)removedMemberRole, (Object)removeAllShardReplicasResult.getErrors());
            }
            deleteTransaction.commit().get();
            LOG.debug("Delete-Candidates transaction was successful");
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Delete-Candidates transaction failed", (Throwable)e);
        }
    }

    private List<InstanceIdentifier<Candidate>> getCandidatesFromDatastore(Member removedMember) {
        EntityOwners owners;
        List removedMemberRoles = removedMember.getRoles().stream().filter(role -> !role.contains("default")).collect(Collectors.toList());
        LOG.debug("Getting Candidates from model EntityOwners for member's roles: {}", removedMemberRoles);
        LinkedList<InstanceIdentifier<Candidate>> candidatesToDelete = new LinkedList<InstanceIdentifier<Candidate>>();
        try (ReadTransaction readOwners = this.dataBroker.newReadOnlyTransaction();){
            owners = ((Optional)readOwners.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(EntityOwners.class)).get()).orElse(null);
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Couldn't read data from model EntityOwners", (Throwable)e);
            return Collections.emptyList();
        }
        for (EntityType entityType : owners.getEntityType()) {
            for (Entity entity : entityType.getEntity()) {
                for (Candidate candidate : entity.getCandidate()) {
                    if (!removedMemberRoles.contains(candidate.getName())) continue;
                    LOG.debug("Found candidate in shard: {}", (Object)entity.getId());
                    InstanceIdentifier cand = InstanceIdentifier.builder(EntityOwners.class).child(EntityType.class, (Identifier)entityType.key()).child(Entity.class, (Identifier)entity.key()).child(Candidate.class, (Identifier)candidate.key()).build();
                    candidatesToDelete.add((InstanceIdentifier<Candidate>)cand);
                }
            }
        }
        LOG.debug("The removed member is registered as candidate in {}", (Object)candidatesToDelete.size());
        LOG.trace("The removed member is registered as: {}", candidatesToDelete);
        return candidatesToDelete;
    }

    private ListenableScheduledFuture schedulePodRestart(Member unreachable, String unreachablePodName) {
        if (unreachablePodName == null || unreachablePodName.isEmpty()) {
            LOG.error("Pod name was missing or empty. Can't schedule Pod restart.");
            return null;
        }
        LOG.info("Before restarting wait {}s. If member becomes reachable again, restart will be aborted", (Object)this.podRestartTimeout);
        ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor());
        return listeningScheduledExecutorService.schedule(() -> {
            ClusterEvent.CurrentClusterState state = Cluster.get((ActorSystem)this.actorSystem).state();
            if (((Collection)state.getMembers()).contains(unreachable)) {
                if (state.getUnreachable().contains(unreachable)) {
                    LOG.debug("Requesting Kubernetes to restart the pod {}", (Object)unreachablePodName);
                    this.sendRestartRequest(unreachable, unreachablePodName);
                } else {
                    LOG.debug("Member {} is reachable again. Aborting POD restart", (Object)unreachablePodName);
                }
            } else {
                LOG.warn("Member {} is no longer listed among other cluster members. Trying to restart it.", (Object)unreachablePodName);
                this.sendRestartRequest(unreachable, unreachablePodName);
            }
        }, this.podRestartTimeout.longValue(), TimeUnit.SECONDS);
    }

    private void sendRestartRequest(Member unreachableMember, String unreachablePodName) {
        LOG.info("Member didn't return to reachable state, trying to restart its Pod");
        URIBuilder uriBuilder = this.getURIForKubernetesAPICall("/api/v1/namespaces/default/pods/" + unreachablePodName);
        try {
            LOG.debug("Creating REST request for Deleting Pod: {}", (Object)uriBuilder.toString());
            HttpDelete deletePodRequest = new HttpDelete(uriBuilder.build());
            Config k8sClient = new ConfigBuilder().build();
            deletePodRequest.addHeader("Authorization", "Bearer " + k8sClient.getOauthToken());
            deletePodRequest.addHeader("Content-Type", "application/json");
            LOG.debug("Executing REST request for Deleting Pod");
            CloseableHttpClient httpClient = this.getHttpClient();
            CloseableHttpResponse response = httpClient.execute((HttpUriRequest)deletePodRequest);
            LOG.trace("Response from Kubernetes: {}", (Object)response);
            String result = IOUtils.toString((InputStream)response.getEntity().getContent(), (Charset)StandardCharsets.UTF_8);
            LOG.trace("Response Entity from Kubernetes: {}", (Object)result);
            if (response.getStatusLine().getStatusCode() >= 200 && response.getStatusLine().getStatusCode() < 300) {
                LOG.info("Request successful. Kubernetes will restart Pod with name: {}", (Object)unreachablePodName);
                this.downMember(unreachableMember);
            } else if (response.getStatusLine().getStatusCode() == 404) {
                LOG.info("Request to delete Pod {} failed because the pod no longer exists. Safe to down member.", (Object)unreachablePodName);
                this.downMember(unreachableMember);
            } else {
                LOG.error("Request to delete Pod {} failed. Not safe to down member. Response from Kubernetes: {}", (Object)unreachablePodName, (Object)response);
            }
        }
        catch (IOException | URISyntaxException | KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
            LOG.error("Request to delete Pod failed", (Throwable)e);
        }
    }

    public boolean safeToDownMember(Member unreachableMember) {
        JSONObject podList = this.getAllLightyPods();
        if (podList == null) {
            LOG.error("List of Pods wasn't received. Can't decide whether it's safe to Down the unreachable member {}", (Object)unreachableMember.address());
            return false;
        }
        JSONArray items = podList.getJSONArray("items");
        HashMap<String, JsonNode> podMap = new HashMap<String, JsonNode>();
        for (int i = 0; i < items.length(); ++i) {
            ObjectMapper mapper = new ObjectMapper();
            try {
                JsonNode podDetail = mapper.readTree(items.getJSONObject(i).toString());
                String podIP = podDetail.at("/status/podIP").asText();
                if (podIP != null && !podIP.isEmpty()) {
                    podMap.put(podIP, podDetail);
                    continue;
                }
                LOG.debug("PodIP wasn't found in Pod info");
                LOG.trace("Pod info: {}", (Object)podDetail);
                continue;
            }
            catch (IOException e) {
                LOG.warn("Couldn't get podIP from Pod info");
            }
        }
        LOG.debug("List of all Pod IPs: {}", podMap.keySet());
        Address unreachableAddress = unreachableMember.address();
        if (unreachableAddress.host().nonEmpty()) {
            LOG.debug("Address of unreachable member is: {}", unreachableAddress.host().get());
            if (podMap.containsKey(unreachableAddress.host().get())) {
                LOG.debug("IP of unreachable was found in Pods List. Checking container state");
                return this.analyzePodState(unreachableMember, (JsonNode)podMap.get(unreachableAddress.host().get()));
            }
            LOG.debug("IP of unreachable was not found in Pods List.. it is safe to delete it");
            return true;
        }
        return false;
    }

    private JSONObject getAllLightyPods() {
        LOG.debug("Getting Lighty Pods from Kubernetes");
        try {
            CloseableHttpClient httpClient = this.getHttpClient();
            HttpGet request = new HttpGet(this.getURIForKubernetesAPICall(K8S_GET_PODS_PATH).setParameter("labelSelector", "app=lighty-k8s-cluster").build());
            Config k8sClient = new ConfigBuilder().build();
            request.addHeader("Authorization", "Bearer " + k8sClient.getOauthToken());
            request.addHeader("Content-Type", "application/json");
            CloseableHttpResponse response = httpClient.execute((HttpUriRequest)request);
            String result = IOUtils.toString((InputStream)response.getEntity().getContent(), (Charset)StandardCharsets.UTF_8);
            LOG.debug("Get Lighty Pods from Kubernetes result: {}", (Object)result);
            return new JSONObject(result);
        }
        catch (IOException | URISyntaxException | KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
            LOG.error("Requesting Pods from Kubernetes failed = {}", (Object)e.toString(), (Object)e);
            return null;
        }
    }

    private CloseableHttpClient getHttpClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
        SSLContextBuilder builder = new SSLContextBuilder();
        builder.loadTrustMaterial(null, (TrustStrategy)new TrustSelfSignedStrategy());
        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build(), (HostnameVerifier)NoopHostnameVerifier.INSTANCE);
        CloseableHttpClient httpClient = HttpClients.custom().setSSLSocketFactory((LayeredConnectionSocketFactory)sslsf).build();
        LOG.debug("SSL factory and HttpClient created");
        return httpClient;
    }

    private URIBuilder getURIForKubernetesAPICall(String path) {
        URIBuilder uri = new URIBuilder();
        uri.setScheme(K8S_SCHEME);
        uri.setHost(K8S_HOST);
        uri.setPath(path);
        return uri;
    }

    private boolean analyzePodState(Member unreachableMember, JsonNode podInfo) {
        JsonNode containerStatusesNode = podInfo.at("/status/containerStatuses");
        if (!containerStatusesNode.isMissingNode() && containerStatusesNode.isArray() && containerStatusesNode.size() > 0) {
            ArrayNode containerStatuses = (ArrayNode)containerStatusesNode;
            if (!containerStatuses.get(0).at("/ready").asBoolean()) {
                if (!containerStatuses.get(0).at("/state/terminated").isMissingNode()) {
                    LOG.debug("Found state container - Terminated, safe to Down member");
                    return true;
                }
                LOG.debug("State container doesn't say Terminated");
            } else {
                LOG.debug("ContainerStatus is READY");
            }
        } else {
            LOG.warn("ContainerStatuses list missing or empty");
            LOG.debug("ContainerStatuses detail: {}", (Object)podInfo);
        }
        String name = podInfo.at("/metadata/name").asText();
        this.schedulePodRestart(unreachableMember, name);
        return false;
    }
}

