/*
 * Decompiled with CFR 0.152.
 */
package cloud.orbit.actors.cluster.impl;

import cloud.orbit.actors.cluster.RedisClusterConfig;
import cloud.orbit.actors.cluster.impl.RedisMsg;
import cloud.orbit.actors.cluster.impl.RedisOrbitClient;
import cloud.orbit.actors.cluster.impl.RedisPipelineCodec;
import cloud.orbit.exception.UncheckedException;
import com.github.ssedano.hash.JumpConsistentHash;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisConnectionManager {
    private RedisClusterConfig redisClusterConfig = null;
    private List<RedisOrbitClient> nodeDirectoryClients = new ArrayList<RedisOrbitClient>();
    private List<RedisOrbitClient> actorDirectoryClients = new ArrayList<RedisOrbitClient>();
    private List<RedisOrbitClient> messagingClients = new ArrayList<RedisOrbitClient>();
    private EventLoopGroup eventLoopGroup = null;
    private static Logger logger = LoggerFactory.getLogger(RedisConnectionManager.class);

    public RedisConnectionManager(RedisClusterConfig redisClusterConfig) {
        this.redisClusterConfig = redisClusterConfig;
        if (redisClusterConfig.getShareEventLoop().booleanValue()) {
            this.eventLoopGroup = new NioEventLoopGroup();
        }
        List<String> nodeDirectoryMasters = redisClusterConfig.getNodeDirectoryUris();
        for (String string : nodeDirectoryMasters) {
            logger.info("Connecting to Redis Node Directory node at '{}'...", (Object)string);
            this.nodeDirectoryClients.add(this.createClient(string, true));
        }
        List<String> actorDirectoryMasters = redisClusterConfig.getActorDirectoryUris();
        for (String uri : actorDirectoryMasters) {
            logger.info("Connecting to Redis Actor Directory node at '{}'...", (Object)uri);
            this.actorDirectoryClients.add(this.createClient(uri, true));
        }
        List<String> list = redisClusterConfig.getMessagingUris();
        for (String uri : list) {
            logger.info("Connecting to Redis messaging node at '{}'...", (Object)uri);
            this.messagingClients.add(this.createClient(uri, false));
        }
    }

    public List<RedisOrbitClient> getNodeDirectoryClients() {
        return Collections.unmodifiableList(this.nodeDirectoryClients);
    }

    public List<RedisOrbitClient> getActorDirectoryClients() {
        return Collections.unmodifiableList(this.actorDirectoryClients);
    }

    public List<RedisOrbitClient> getMessagingClients() {
        return Collections.unmodifiableList(this.messagingClients);
    }

    public RedisOrbitClient getShardedNodeDirectoryClient(String shardId) {
        int jumpConsistentHash = JumpConsistentHash.jumpConsistentHash((Object)shardId, (int)this.nodeDirectoryClients.size());
        return this.nodeDirectoryClients.get(jumpConsistentHash);
    }

    public RedisOrbitClient getShardedActorDirectoryClient(String shardId) {
        int jumpConsistentHash = JumpConsistentHash.jumpConsistentHash((Object)shardId, (int)this.actorDirectoryClients.size());
        return this.actorDirectoryClients.get(jumpConsistentHash);
    }

    public void subscribeToChannel(String channelId, MessageListener<RedisMsg> statusListener) {
        for (RedisOrbitClient messagingClient : this.messagingClients) {
            messagingClient.subscribe(channelId, statusListener);
        }
    }

    public void sendMessageToChannel(String channelId, Object msg) {
        List localMessagingClients = this.messagingClients.stream().filter(e -> e.isConnectied()).collect(Collectors.toCollection(ArrayList::new));
        this.sendMessageToChannel(channelId, msg, localMessagingClients, 1);
    }

    private void sendMessageToChannel(String channelId, Object msg, List<RedisOrbitClient> localMessagingClients, int attempt) {
        int activeClientCount = localMessagingClients.size();
        if (activeClientCount == 0) {
            logger.error("Failed to send message to channel '{}', no redis messaging instances were available after {} attempts.", (Object)channelId, (Object)attempt);
            return;
        }
        int randomId = ThreadLocalRandom.current().nextInt(activeClientCount);
        RedissonClient client = localMessagingClients.remove(randomId).getRedissonClient();
        client.getTopic(channelId).publishAsync(msg).whenComplete((numClientsReceived, exception) -> {
            if (exception != null) {
                logger.error("Failed to send message to channel '{}'", (Object)channelId, exception);
            } else if (numClientsReceived == 0L) {
                if (attempt >= this.redisClusterConfig.getMessageSendAttempts()) {
                    logger.error("Failed to send message to channel '{}' after {} attempts.", (Object)channelId, (Object)attempt);
                } else {
                    logger.warn("Failed to send message to channel '{}' on attempt {}. Retrying...", (Object)channelId, (Object)attempt);
                    this.sendMessageToChannel(channelId, msg, localMessagingClients, attempt + 1);
                }
            }
        });
    }

    public void shutdownConnections() {
        this.nodeDirectoryClients.forEach(RedisOrbitClient::shutdown);
        this.actorDirectoryClients.forEach(RedisOrbitClient::shutdown);
        this.messagingClients.forEach(RedisOrbitClient::shutdown);
    }

    private RedisOrbitClient createClient(String uri, Boolean useJavaSerializer) {
        Integer port;
        URI realUri = URI.create(uri);
        if (!realUri.getScheme().equalsIgnoreCase("redis")) {
            throw new UncheckedException("Invalid Redis URI.");
        }
        String host = realUri.getHost();
        if (host == null) {
            host = "localhost";
        }
        if ((port = Integer.valueOf(realUri.getPort())) == -1) {
            port = 6379;
        }
        String resolvedUri = "redis://" + host + ":" + port;
        Config redissonConfig = new Config();
        Object currentCodec = useJavaSerializer != false ? new SerializationCodec() : new JsonJacksonCodec();
        currentCodec = new RedisPipelineCodec(this.redisClusterConfig.getPipelineSteps(), (Codec)currentCodec);
        redissonConfig.setCodec((Codec)currentCodec);
        if (this.eventLoopGroup != null) {
            redissonConfig.setEventLoopGroup(this.eventLoopGroup);
        }
        redissonConfig.setThreads(this.redisClusterConfig.getRedissonThreads().intValue());
        redissonConfig.setNettyThreads(this.redisClusterConfig.getNettyThreads().intValue());
        if (this.redisClusterConfig.getRedissonExecutorService() != null) {
            redissonConfig.setExecutor(this.redisClusterConfig.getRedissonExecutorService());
        }
        ((SingleServerConfig)((SingleServerConfig)((SingleServerConfig)((SingleServerConfig)((SingleServerConfig)((SingleServerConfig)((SingleServerConfig)redissonConfig.useSingleServer().setDnsMonitoringInterval((long)this.redisClusterConfig.getDnsMonitoringInverval().intValue()).setAddress(resolvedUri).setConnectionMinimumIdleSize(this.redisClusterConfig.getMinRedisConnections().intValue()).setConnectionPoolSize(this.redisClusterConfig.getMaxRedisConnections().intValue()).setConnectTimeout(this.redisClusterConfig.getConnectionTimeout().intValue())).setTimeout(this.redisClusterConfig.getGeneralTimeout().intValue())).setIdleConnectionTimeout(this.redisClusterConfig.getIdleTimeout().intValue())).setReconnectionTimeout(this.redisClusterConfig.getReconnectionTimeout().intValue())).setPingTimeout(this.redisClusterConfig.getPingTimeout().intValue())).setFailedAttempts(this.redisClusterConfig.getFailedAttempts().intValue())).setRetryAttempts(this.redisClusterConfig.getRetryAttempts().intValue())).setRetryInterval(this.redisClusterConfig.getRetryInterval().intValue());
        return new RedisOrbitClient(Redisson.create((Config)redissonConfig), this.redisClusterConfig.getMessagingHealthcheckInterval());
    }
}

