package org.axonframework.commandhandling.distributed.jgroups;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.commandhandling.distributed.CommandBusConnectorCommunicationException;
import org.axonframework.commandhandling.distributed.CommandCallbackRepository;
import org.axonframework.commandhandling.distributed.CommandCallbackWrapper;
import org.axonframework.commandhandling.distributed.CommandRouter;
import org.axonframework.commandhandling.distributed.ConsistentHash;
import org.axonframework.commandhandling.distributed.Member;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.commandhandling.distributed.ServiceRegistryException;
import org.axonframework.commandhandling.distributed.SimpleMember;
import org.axonframework.commandhandling.distributed.commandfilter.DenyAll;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.serialization.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector.class */
public class JGroupsConnector implements CommandRouter, Receiver, CommandBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(JGroupsConnector.class);
    private final CommandBus localSegment;
    private final Serializer serializer;
    private final String clusterName;
    private final RoutingStrategy routingStrategy;
    private final JChannel channel;
    private volatile View currentView;
    private final CommandCallbackRepository<Address> callbackRepository = new CommandCallbackRepository<>();
    private final JoinCondition joinedCondition = new JoinCondition();
    private final Map<Address, SimpleMember<Address>> members = new HashMap();
    private volatile int loadFactor = 0;
    private volatile Predicate<CommandMessage<?>> commandFilter = DenyAll.INSTANCE;
    private final AtomicReference<ConsistentHash> consistentHash = new AtomicReference<>(new ConsistentHash());

    /* loaded from: input_file:org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector$JoinCondition.class */
    private static final class JoinCondition {
        private final CountDownLatch joinCountDown;
        private volatile boolean success;

        private JoinCondition() {
            this.joinCountDown = new CountDownLatch(1);
        }

        public void await() throws InterruptedException {
            this.joinCountDown.await();
        }

        public void await(long j, TimeUnit timeUnit) throws InterruptedException {
            this.joinCountDown.await(j, timeUnit);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void markJoined(boolean z) {
            this.success = z;
            this.joinCountDown.countDown();
        }

        public boolean isJoined() {
            return this.success;
        }
    }

    public JGroupsConnector(CommandBus commandBus, JChannel jChannel, String str, Serializer serializer, RoutingStrategy routingStrategy) {
        this.localSegment = commandBus;
        this.serializer = serializer;
        this.channel = jChannel;
        this.clusterName = str;
        this.routingStrategy = routingStrategy;
    }

    @Override // org.axonframework.commandhandling.distributed.CommandRouter
    public void updateMembership(int i, Predicate<CommandMessage<?>> predicate) {
        this.loadFactor = i;
        this.commandFilter = predicate;
        broadCastMembership();
    }

    protected void broadCastMembership() throws ServiceRegistryException {
        try {
            if (this.channel.isConnected()) {
                this.channel.send((Address) null, new JoinMessage(this.channel.getAddress(), this.loadFactor, this.commandFilter));
            }
        } catch (Exception e) {
            throw new ServiceRegistryException("Could not broadcast local membership details to the cluster", e);
        }
    }

    public void connect() throws Exception {
        if (this.channel.getClusterName() != null && !this.clusterName.equals(this.channel.getClusterName())) {
            throw new ConnectionFailedException("Already joined cluster: " + this.channel.getClusterName());
        }
        this.channel.setReceiver(this);
        this.channel.connect(this.clusterName);
        broadCastMembership();
        Address address = this.channel.getAddress();
        SimpleMember<Address> simpleMember = new SimpleMember<>(this.channel.getName(address), address, null);
        this.members.put(address, simpleMember);
        this.consistentHash.updateAndGet(consistentHash -> {
            return consistentHash.with(simpleMember, this.loadFactor, this.commandFilter);
        });
    }

    public void disconnect() {
        this.channel.disconnect();
    }

    public void getState(OutputStream outputStream) throws Exception {
    }

    public void setState(InputStream inputStream) throws Exception {
    }

    public void viewAccepted(View view) {
        if (this.currentView == null) {
            logger.info("Local segment ({}) joined the cluster. Broadcasting configuration.", this.channel.getAddress());
            try {
                broadCastMembership();
                this.joinedCondition.markJoined(true);
            } catch (Exception e) {
                throw new MembershipUpdateFailedException("Failed to broadcast my settings", e);
            }
        } else if (!view.equals(this.currentView)) {
            Address[] addressArr = View.diff(this.currentView, view)[0];
            Address[] addressArr2 = View.diff(this.currentView, view)[1];
            Arrays.asList(addressArr).stream().filter(address -> {
                return !address.equals(this.channel.getAddress());
            }).forEach(address2 -> {
                logger.info("New member detected: [{}]. Sending it my configuration.", address2);
                try {
                    this.channel.send(address2, new JoinMessage(this.channel.getAddress(), this.loadFactor, this.commandFilter));
                } catch (Exception e2) {
                    throw new MembershipUpdateFailedException("Failed to notify my existence to " + address2);
                }
            });
            Arrays.stream(addressArr2).forEach(address3 -> {
                this.consistentHash.updateAndGet(consistentHash -> {
                    SimpleMember<Address> simpleMember = this.members.get(address3);
                    return simpleMember == null ? consistentHash : consistentHash.without(simpleMember);
                });
            });
            Stream stream = Arrays.stream(addressArr2);
            Map<Address, SimpleMember<Address>> map = this.members;
            map.getClass();
            stream.forEach((v1) -> {
                r1.remove(v1);
            });
        }
        this.currentView = view;
    }

    public void suspect(Address address) {
        logger.warn("Member is suspect: {}", address.toString());
    }

    public void block() {
    }

    public void unblock() {
    }

    public void receive(Message message) {
        Object object = message.getObject();
        if (object instanceof JoinMessage) {
            processJoinMessage(message, (JoinMessage) object);
        } else if (object instanceof DispatchMessage) {
            processDispatchMessage(message, (DispatchMessage) object);
        } else if (object instanceof ReplyMessage) {
            processReplyMessage((ReplyMessage) object);
        }
    }

    private void processReplyMessage(ReplyMessage replyMessage) {
        CommandCallbackWrapper<Address, C, R> fetchAndRemove = this.callbackRepository.fetchAndRemove(replyMessage.getCommandIdentifier());
        if (fetchAndRemove == 0) {
            logger.warn("Received a callback for a message that has either already received a callback, or which was not sent through this node. Ignoring.");
        } else if (replyMessage.isSuccess()) {
            fetchAndRemove.success(replyMessage.getReturnValue(this.serializer));
        } else {
            fetchAndRemove.fail(replyMessage.getError(this.serializer));
        }
    }

    private <C, R> void processDispatchMessage(final Message message, final DispatchMessage dispatchMessage) {
        if (!dispatchMessage.isExpectReply()) {
            try {
                this.localSegment.dispatch(dispatchMessage.getCommandMessage(this.serializer));
                return;
            } catch (Exception e) {
                logger.error("Could not dispatch command", e);
                return;
            }
        }
        try {
            this.localSegment.dispatch(dispatchMessage.getCommandMessage(this.serializer), new CommandCallback<C, R>() { // from class: org.axonframework.commandhandling.distributed.jgroups.JGroupsConnector.1
                public void onSuccess(CommandMessage<? extends C> commandMessage, R r) {
                    JGroupsConnector.this.sendReply(message.getSrc(), dispatchMessage.getCommandIdentifier(), r, null);
                }

                public void onFailure(CommandMessage<? extends C> commandMessage, Throwable th) {
                    JGroupsConnector.this.sendReply(message.getSrc(), dispatchMessage.getCommandIdentifier(), null, th);
                }
            });
        } catch (Exception e2) {
            sendReply(message.getSrc(), dispatchMessage.getCommandIdentifier(), null, e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <R> void sendReply(Address address, String str, R r, Throwable th) {
        try {
            this.channel.send(address, new ReplyMessage(str, r, th, this.serializer));
        } catch (Exception e) {
            try {
                this.channel.send(address, new ReplyMessage(str, null, e, this.serializer));
            } catch (Exception e2) {
                logger.error("Could not send reply", e2);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void processJoinMessage(Message message, JoinMessage joinMessage) {
        String name = this.channel.getName(message.getSrc());
        if (name == null) {
            logger.warn("Received join message from '{}', but a connection with the sender has been lost.", message.getSrc().toString());
            return;
        }
        int loadFactor = joinMessage.getLoadFactor();
        Predicate<CommandMessage<?>> messageFilter = joinMessage.messageFilter();
        SimpleMember simpleMember = new SimpleMember(name, message.getSrc(), null);
        this.members.put(simpleMember.endpoint(), simpleMember);
        this.consistentHash.updateAndGet(consistentHash -> {
            return consistentHash.with(simpleMember, loadFactor, messageFilter);
        });
        if (logger.isInfoEnabled() && !message.getSrc().equals(this.channel.getAddress())) {
            logger.info("{} joined with load factor: {}", name, Integer.valueOf(loadFactor));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Got a network of members: {}", this.members.values());
        }
    }

    public boolean awaitJoined() throws InterruptedException {
        this.joinedCondition.await();
        return this.joinedCondition.isJoined();
    }

    public boolean awaitJoined(long j, TimeUnit timeUnit) throws InterruptedException {
        this.joinedCondition.await(j, timeUnit);
        return this.joinedCondition.isJoined();
    }

    public String getNodeName() {
        return this.channel.getName();
    }

    protected ConsistentHash getConsistentHash() {
        return this.consistentHash.get();
    }

    @Override // org.axonframework.commandhandling.distributed.CommandBusConnector
    public <C> void send(Member member, CommandMessage<? extends C> commandMessage) throws Exception {
        this.channel.send(resolveAddress(member), new DispatchMessage(commandMessage, this.serializer, false));
    }

    @Override // org.axonframework.commandhandling.distributed.CommandBusConnector
    public <C, R> void send(Member member, CommandMessage<C> commandMessage, CommandCallback<? super C, R> commandCallback) throws Exception {
        this.callbackRepository.store(commandMessage.getIdentifier(), new CommandCallbackWrapper<>(member, commandMessage, commandCallback));
        this.channel.send(resolveAddress(member), new DispatchMessage(commandMessage, this.serializer, true));
    }

    @Override // org.axonframework.commandhandling.distributed.CommandBusConnector
    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        return this.localSegment.subscribe(str, messageHandler);
    }

    protected Address resolveAddress(Member member) {
        return (Address) member.getConnectionEndpoint(Address.class).orElseThrow(() -> {
            return new CommandBusConnectorCommunicationException("The target member doesn't expose a JGroups endpoint");
        });
    }

    @Override // org.axonframework.commandhandling.distributed.CommandRouter
    public Optional<Member> findDestination(CommandMessage<?> commandMessage) {
        return this.consistentHash.get().getMember(this.routingStrategy.getRoutingKey(commandMessage), commandMessage);
    }
}
