/*
 * Decompiled with CFR 0.152.
 */
package io.lighty.core.cluster.kubernetes;

import akka.actor.AbstractActor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.japi.Creator;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.ApiResponse;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ContainerStatus;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.ClientBuilder;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
import org.opendaylight.yangtools.yang.binding.Identifier;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnreachableListener
extends AbstractActor {
    private static final Logger LOG = LoggerFactory.getLogger(UnreachableListener.class);
    private static final long DEFAULT_UNREACHABLE_RESTART_TIMEOUT = 30L;
    private final Cluster cluster = Cluster.get((ActorSystem)this.getContext().getSystem());
    private final ActorSystem actorSystem;
    private final DataBroker dataBroker;
    private final ClusterAdminService clusterAdminRPCService;
    private final Long podRestartTimeout;
    private final Set<Member> initialUnreachableSet;
    private CoreV1Api kubernetesApi;
    private String kubernetesPodNamespace;
    private String kubernetesPodSelector;

    public UnreachableListener(ActorSystem actorSystem, DataBroker dataBroker, ClusterAdminService clusterAdminRPCService, String kubernetesPodNamespace, String kubernetesPodSelector, Long podRestartTimeout) {
        LOG.info("UnreachableListener created");
        this.dataBroker = dataBroker;
        this.clusterAdminRPCService = clusterAdminRPCService;
        this.actorSystem = actorSystem;
        this.kubernetesPodNamespace = kubernetesPodNamespace;
        this.kubernetesPodSelector = kubernetesPodSelector;
        this.initialUnreachableSet = new HashSet<Member>();
        if (podRestartTimeout == null || podRestartTimeout == 0L) {
            this.podRestartTimeout = 30L;
            LOG.info("Pod-restart-timeout wasn't loaded from akka-config, using default:{}", (Object)this.podRestartTimeout);
        } else {
            this.podRestartTimeout = podRestartTimeout;
            LOG.info("Pod-restart-timeout value was loaded from akka-config:{}", (Object)this.podRestartTimeout);
        }
        try {
            ApiClient client = ClientBuilder.cluster().build();
            Configuration.setDefaultApiClient((ApiClient)client);
            this.kubernetesApi = new CoreV1Api();
        }
        catch (IOException e) {
            LOG.error("IOException while initializing cluster ApiClient", (Throwable)e);
        }
    }

    public static Props props(ActorSystem actorSystem, DataBroker dataBroker, ClusterAdminService clusterAdminRPCService, String kubernetesPodNamespace, String kubernetesPodSelector, Long podRestartTimeout) {
        return Props.create(UnreachableListener.class, (Creator & Serializable)() -> new UnreachableListener(actorSystem, dataBroker, clusterAdminRPCService, kubernetesPodNamespace, kubernetesPodSelector, podRestartTimeout));
    }

    public void preStart() {
        this.cluster.subscribe(this.getSelf(), (ClusterEvent.SubscriptionInitialStateMode)ClusterEvent.initialStateAsEvents(), new Class[]{ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class});
        this.initialUnreachableSet.addAll(this.cluster.state().getUnreachable());
        if (!this.initialUnreachableSet.isEmpty()) {
            for (Member member : this.initialUnreachableSet) {
                LOG.info("PreStart: Member detected as unreachable, preparing for downing: {}", (Object)member.address());
                this.processUnreachableMember(member);
            }
        }
    }

    public void postStop() {
        this.cluster.unsubscribe(this.getSelf());
    }

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ClusterEvent.UnreachableMember.class, mUnreachable -> {
            if (this.initialUnreachableSet.contains(mUnreachable.member())) {
                this.initialUnreachableSet.remove(mUnreachable.member());
                LOG.info("Member {} was already removed during PreStart.", (Object)mUnreachable.member().address());
                return;
            }
            LOG.info("Member detected as unreachable, processing: {}", (Object)mUnreachable.member().address());
            this.processUnreachableMember(mUnreachable.member());
        }).match(ClusterEvent.MemberRemoved.class, mRemoved -> LOG.info("Member was Removed: {}", (Object)mRemoved.member())).build();
    }

    private void processUnreachableMember(Member member) {
        ClusterEvent.CurrentClusterState currentState = this.cluster.state();
        if (this.isMajorityReachable(((Collection)currentState.getMembers()).size(), currentState.getUnreachable().size())) {
            if (!this.safeToDownMember(member)) {
                LOG.info("It is not safe to down member {}", (Object)member.address());
                return;
            }
            this.downMember(member);
            LOG.info("Downing complete");
        } else {
            LOG.warn("Majority of cluster seems to be unreachable. This is probably due to network partition in which case the other side will resolve it (since they are the majority). Downing members from this side isn't safe");
        }
    }

    private boolean isMajorityReachable(int totalMembers, int unreachableMembers) {
        return (double)(totalMembers - unreachableMembers) >= Math.floor((double)(totalMembers + 1) / 2.0) + (double)((totalMembers + 1) % 2);
    }

    private void downMember(Member member) {
        LOG.info("Downing member {}", (Object)member.address());
        List removedMemberRoles = member.getRoles().stream().filter(role -> !role.contains("default")).collect(Collectors.toList());
        this.cluster.down(member.address());
        WriteTransaction deleteTransaction = this.dataBroker.newWriteOnlyTransaction();
        for (InstanceIdentifier<Candidate> candidateIID : this.getCandidatesFromDatastore(member)) {
            LOG.debug("Deleting candidate: {}", candidateIID);
            deleteTransaction.delete(LogicalDatastoreType.OPERATIONAL, candidateIID);
        }
        try {
            for (String removedMemberRole : removedMemberRoles) {
                ListenableFuture rpcResultListenableFuture = this.clusterAdminRPCService.removeAllShardReplicas(new RemoveAllShardReplicasInputBuilder().setMemberName(removedMemberRole).build());
                RpcResult removeAllShardReplicasResult = (RpcResult)rpcResultListenableFuture.get();
                if (removeAllShardReplicasResult.isSuccessful()) {
                    LOG.debug("RPC RemoveAllShards for member {} executed successfully", (Object)removedMemberRole);
                    continue;
                }
                LOG.warn("RPC RemoveAllShards for member {} failed: {}", (Object)removedMemberRole, (Object)removeAllShardReplicasResult.getErrors());
            }
            deleteTransaction.commit().get();
            LOG.debug("Delete-Candidates transaction was successful");
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Delete-Candidates transaction failed", (Throwable)e);
        }
    }

    private List<InstanceIdentifier<Candidate>> getCandidatesFromDatastore(Member removedMember) {
        EntityOwners owners;
        List removedMemberRoles = removedMember.getRoles().stream().filter(role -> !role.contains("default")).collect(Collectors.toList());
        LOG.debug("Getting Candidates from model EntityOwners for member's roles: {}", removedMemberRoles);
        LinkedList<InstanceIdentifier<Candidate>> candidatesToDelete = new LinkedList<InstanceIdentifier<Candidate>>();
        try (ReadTransaction readOwners = this.dataBroker.newReadOnlyTransaction();){
            owners = ((Optional)readOwners.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(EntityOwners.class)).get()).orElse(null);
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Couldn't read data from model EntityOwners", (Throwable)e);
            return Collections.emptyList();
        }
        for (EntityType entityType : owners.getEntityType().values()) {
            for (Entity entity : entityType.getEntity().values()) {
                for (Candidate candidate : entity.getCandidate()) {
                    if (!removedMemberRoles.contains(candidate.getName())) continue;
                    LOG.debug("Found candidate in shard: {}", (Object)entity.getId());
                    InstanceIdentifier cand = InstanceIdentifier.builder(EntityOwners.class).child(EntityType.class, (Identifier)entityType.key()).child(Entity.class, (Identifier)entity.key()).child(Candidate.class, (Identifier)candidate.key()).build();
                    candidatesToDelete.add((InstanceIdentifier<Candidate>)cand);
                }
            }
        }
        LOG.debug("The removed member is registered as candidate in {}", (Object)candidatesToDelete.size());
        LOG.trace("The removed member is registered as: {}", candidatesToDelete);
        return candidatesToDelete;
    }

    private ListenableScheduledFuture schedulePodRestart(Member unreachable, String unreachablePodName) {
        if (unreachablePodName == null || unreachablePodName.isEmpty()) {
            LOG.error("Pod name was missing or empty. Can't schedule Pod restart.");
            return null;
        }
        LOG.info("Before restarting wait {}s. If member becomes reachable again, restart will be aborted", (Object)this.podRestartTimeout);
        ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor());
        return listeningScheduledExecutorService.schedule(() -> {
            ClusterEvent.CurrentClusterState state = Cluster.get((ActorSystem)this.actorSystem).state();
            if (((Collection)state.getMembers()).contains(unreachable)) {
                if (state.getUnreachable().contains(unreachable)) {
                    LOG.debug("Requesting Kubernetes to restart the pod {}", (Object)unreachablePodName);
                    this.sendRestartRequest(unreachable, unreachablePodName);
                } else {
                    LOG.debug("Member {} is reachable again. Aborting POD restart", (Object)unreachablePodName);
                }
            } else {
                LOG.warn("Member {} is no longer listed among other cluster members. Trying to restart it.", (Object)unreachablePodName);
                this.sendRestartRequest(unreachable, unreachablePodName);
            }
        }, this.podRestartTimeout.longValue(), TimeUnit.SECONDS);
    }

    private void sendRestartRequest(Member unreachableMember, String unreachablePodName) {
        LOG.info("Member didn't return to reachable state, trying to restart its Pod");
        try {
            ApiResponse response = this.kubernetesApi.deleteNamespacedPodWithHttpInfo(unreachablePodName, this.kubernetesPodNamespace, null, null, null, null, null, null);
            int responseStatusCode = response.getStatusCode();
            if (responseStatusCode >= 200 && responseStatusCode < 300) {
                LOG.info("Request successful. Kubernetes will restart Pod with name: {}", (Object)unreachablePodName);
                this.downMember(unreachableMember);
            } else if (responseStatusCode == 404) {
                LOG.info("Request to delete Pod {} failed because the pod no longer exists. Safe to down member.", (Object)unreachablePodName);
                this.downMember(unreachableMember);
            } else {
                LOG.error("Request to delete Pod {} failed. Not safe to down member. Response from Kubernetes: {}", (Object)unreachablePodName, (Object)response);
            }
        }
        catch (ApiException e) {
            LOG.debug("ApiException on api.deleteNamespacedPodWithHttpInfo", (Throwable)e);
            if (e.getCode() == 404) {
                LOG.info("Request to delete Pod {} failed because the pod no longer exists. Safe to down member.", (Object)unreachablePodName);
                this.downMember(unreachableMember);
            }
            LOG.error("Unhandled response from API on api.deleteNamedSpacedPod with response code {} . Not safe to down member. ", (Object)e.getCode());
        }
    }

    public boolean safeToDownMember(Member unreachableMember) {
        Optional<V1PodList> podListOptional = this.getAllLightyPods();
        if (podListOptional.isEmpty()) {
            LOG.error("List of Pods wasn't received. Can't decide whether it's safe to Down the unreachable member {}", (Object)unreachableMember.address());
            return false;
        }
        boolean containsUnreachableIP = false;
        V1Pod conflictingPod = null;
        String unreachableMemberHostIP = (String)unreachableMember.address().host().get();
        LOG.debug("Address of unreachable member is: {}", (Object)unreachableMemberHostIP);
        for (V1Pod pod : podListOptional.get().getItems()) {
            LOG.debug("Pod: {} has PodIP: {}", (Object)pod.getMetadata().getName(), (Object)pod.getStatus().getPodIP());
            if (!pod.getStatus().getPodIP().equals(unreachableMemberHostIP)) continue;
            containsUnreachableIP = true;
            conflictingPod = pod;
            break;
        }
        if (containsUnreachableIP) {
            LOG.debug("IP of unreachable was found in Pods List. Checking container state");
            return this.analyzePodState(unreachableMember, conflictingPod);
        }
        LOG.debug("IP of unreachable was not found in Pods List.. it is safe to delete it");
        return true;
    }

    private Optional<V1PodList> getAllLightyPods() {
        LOG.debug("Getting Lighty Pods from Kubernetes");
        try {
            ApiResponse apiResponse = this.kubernetesApi.listNamespacedPodWithHttpInfo(this.kubernetesPodNamespace, null, null, null, null, this.kubernetesPodSelector, null, null, null, null);
            int responseStatusCode = apiResponse.getStatusCode();
            if (responseStatusCode >= 200 && responseStatusCode < 300) {
                LOG.info("Successfully retrieved Pods List");
                return Optional.of((V1PodList)apiResponse.getData());
            }
            LOG.warn("Error retrieving Pods List , Http status code = {}", (Object)responseStatusCode);
        }
        catch (ApiException e) {
            LOG.debug("ApiException on api.listNamespacedPodWithHttpInfo", (Throwable)e);
            LOG.warn("Error retrieving Pods List , Http status code = {}", (Object)e.getCode());
        }
        return Optional.empty();
    }

    private boolean analyzePodState(Member unreachableMember, V1Pod pod) {
        List containerStatuses = pod.getStatus().getContainerStatuses();
        if (containerStatuses != null && !containerStatuses.isEmpty()) {
            if (!((V1ContainerStatus)containerStatuses.get(0)).getReady().booleanValue()) {
                LOG.debug("State of the container is - {} ", (Object)((V1ContainerStatus)containerStatuses.get(0)).getState().toString());
                if (((V1ContainerStatus)containerStatuses.get(0)).getState().getTerminated() != null) {
                    LOG.debug("Found state container - Terminated, safe to Down member");
                    return true;
                }
                LOG.debug("State of the container is not terminated");
            } else {
                LOG.debug("ContainerStatus is READY");
            }
        } else {
            LOG.warn("ContainerStatuses list missing or empty");
            LOG.debug("ContainerStatuses detail: {}", (Object)pod.getStatus().getContainerStatuses());
        }
        this.schedulePodRestart(unreachableMember, pod.getMetadata().getName());
        return false;
    }
}

