/*
 * Decompiled with CFR 0.152.
 */
package cloud.orbit.actors.cluster;

import cloud.orbit.actors.cluster.ClusterPeer;
import cloud.orbit.actors.cluster.MessageListener;
import cloud.orbit.actors.cluster.NodeAddress;
import cloud.orbit.actors.cluster.NodeAddressImpl;
import cloud.orbit.actors.cluster.RedisClusterConfig;
import cloud.orbit.actors.cluster.ViewListener;
import cloud.orbit.actors.cluster.impl.RedisDB;
import cloud.orbit.actors.cluster.impl.RedisKeyGenerator;
import cloud.orbit.actors.cluster.impl.RedisMsg;
import cloud.orbit.actors.cluster.impl.RedisShardedMap;
import cloud.orbit.concurrent.Task;
import cloud.orbit.tuples.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisClusterPeer
implements ClusterPeer {
    private static Logger logger = LoggerFactory.getLogger(RedisClusterPeer.class);
    private ViewListener viewListener;
    private MessageListener messageListener;
    private NodeAddress localAddress = new NodeAddressImpl(UUID.randomUUID());
    private String clusterName;
    private RedisClusterConfig config;
    private RedisDB redisDB;
    private final ConcurrentMap<String, ConcurrentMap<?, ?>> cacheManager = new ConcurrentHashMap();

    public RedisClusterPeer(RedisClusterConfig config) {
        this.config = config;
    }

    public <K, V> ConcurrentMap<K, V> getCache(String name) {
        String realName = RedisKeyGenerator.key("shardedMap", Pair.of((Object)"cluster", (Object)this.clusterName), Pair.of((Object)"mapName", (Object)name));
        ConcurrentMap result = (ConcurrentMap)this.cacheManager.get(realName);
        if (result == null) {
            Object targetMap = null;
            targetMap = this.config.getActorDirectoryHashingEnabled() != false ? new RedisShardedMap(realName, this.redisDB.getActorDirectoryClient(), this.config.getActorDirectoryHashBuckets()) : this.redisDB.getActorDirectoryClient().getMap(realName);
            result = (ConcurrentMap)this.cacheManager.putIfAbsent(realName, (ConcurrentMap<?, ?>)targetMap);
            if (result == null) {
                result = targetMap;
            }
        }
        return result;
    }

    public NodeAddress localAddress() {
        return this.localAddress;
    }

    public Task<?> join(String clusterName, String nodeName) {
        logger.info("Joining Redis Cluster '{}' as node '{}' [{}]...", new Object[]{clusterName, nodeName, this.localAddress.asUUID().toString()});
        this.clusterName = clusterName;
        this.redisDB = new RedisDB(this.config);
        String nodeKey = RedisKeyGenerator.nodeKey(clusterName, this.localAddress.toString());
        this.redisDB.getMessagingClient(nodeKey).getTopic(nodeKey).addListener((chan, msg) -> this.receiveMessage((RedisMsg)msg));
        this.writeMyEntry();
        this.syncNodes();
        return Task.done();
    }

    private void writeMyEntry() {
        String nodeKey = RedisKeyGenerator.nodeKey(this.clusterName, this.localAddress.toString());
        this.redisDB.getNodeDirectoryClient().getBucket(nodeKey).set((Object)this.localAddress.toString(), (long)this.config.getNodeLifetimeSeconds().intValue(), TimeUnit.SECONDS);
    }

    private void syncNodes() {
        String nodeKey = RedisKeyGenerator.nodeKey(this.clusterName, "*");
        ArrayList<NodeAddressImpl> nodeAddresses = new ArrayList<NodeAddressImpl>();
        Collection keys = this.redisDB.getNodeDirectoryClient().getKeys().findKeysByPattern(nodeKey);
        for (String key : keys) {
            String rawKey = (String)this.redisDB.getNodeDirectoryClient().getBucket(key).get();
            nodeAddresses.add(new NodeAddressImpl(UUID.fromString(rawKey)));
        }
        this.viewListener.onViewChange(nodeAddresses);
    }

    public void sendMessage(NodeAddress toAddress, byte[] message) {
        Task.runAsync(() -> {
            RedisMsg redisMsg = new RedisMsg();
            redisMsg.setMessageContents(message);
            redisMsg.setSenderAddress(this.localAddress.asUUID());
            String targetNodeKey = RedisKeyGenerator.nodeKey(this.clusterName, toAddress.toString());
            this.redisDB.getMessagingClient(targetNodeKey).getTopic(targetNodeKey).publishAsync((Object)redisMsg).exceptionally(e -> {
                logger.error("Error sending message", e);
                return 0L;
            });
        }, (Executor)this.config.getExecutorService());
    }

    public void receiveMessage(RedisMsg rawMessage) {
        Task.runAsync(() -> {
            NodeAddressImpl nodeAddr = new NodeAddressImpl(rawMessage.getSenderAddress());
            this.messageListener.receive((NodeAddress)nodeAddr, rawMessage.getMessageContents());
        }, (Executor)this.config.getExecutorService()).exceptionally(e -> {
            logger.error("Error receiving message", e);
            return null;
        });
    }

    public Task pulse() {
        this.writeMyEntry();
        this.syncNodes();
        return Task.done();
    }

    public void leave() {
        String nodeKey = RedisKeyGenerator.nodeKey(this.clusterName, this.localAddress.toString());
        this.redisDB.getNodeDirectoryClient().getBucket(nodeKey).delete();
        this.redisDB.shutdownConnections();
    }

    public void registerMessageReceiver(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    public void registerViewListener(ViewListener viewListener) {
        this.viewListener = viewListener;
    }
}

