package org.infinispan.conflict.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.logging.Log;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.util.IntSets;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.executors.LimitedExecutor;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.topology.CacheTopology;

@Scope(Scopes.NAMED_CACHE)
@Listener
/* loaded from: input_file:BOOT-INF/lib/infinispan-core-13.0.17.Final.jar:org/infinispan/conflict/impl/StateReceiverImpl.class */
public class StateReceiverImpl<K, V> implements StateReceiver<K, V> {
    private static final Log log = LogFactory.getLog(StateReceiverImpl.class);

    @ComponentName(KnownComponentNames.CACHE_NAME)
    @Inject
    String cacheName;

    @Inject
    CacheNotifier<K, V> cacheNotifier;

    @Inject
    CommandsFactory commandsFactory;

    @Inject
    InternalDataContainer<K, V> dataContainer;

    @Inject
    RpcManager rpcManager;

    @ComponentName(KnownComponentNames.NON_BLOCKING_EXECUTOR)
    @Inject
    ExecutorService nonBlockingExecutor;
    private LimitedExecutor stateReceiverExecutor;
    private final ConcurrentHashMap<Integer, StateReceiverImpl<K, V>.SegmentRequest> requestMap = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-13.0.17.Final.jar:org/infinispan/conflict/impl/StateReceiverImpl$SegmentRequest.class */
    public class SegmentRequest {
        final int segmentId;
        final LocalizedCacheTopology topology;
        final long timeout;
        final List<Address> replicaHosts;
        final Map<K, Map<Address, CacheEntry<K, V>>> keyReplicaMap = new HashMap();
        final Map<Address, InboundTransferTask> transferTaskMap = new ConcurrentHashMap();
        CompletableFuture<List<Map<Address, CacheEntry<K, V>>>> future;
        static final /* synthetic */ boolean $assertionsDisabled;

        SegmentRequest(int i, LocalizedCacheTopology localizedCacheTopology, long j) {
            this.segmentId = i;
            this.topology = localizedCacheTopology;
            this.timeout = j;
            this.replicaHosts = localizedCacheTopology.getSegmentDistribution(i).writeOwners();
        }

        synchronized CompletableFuture<List<Map<Address, CacheEntry<K, V>>>> requestState() {
            if (this.future != null) {
                if (!$assertionsDisabled && !this.future.isCompletedExceptionally()) {
                    throw new AssertionError();
                }
                if (StateReceiverImpl.log.isTraceEnabled()) {
                    StateReceiverImpl.log.tracef("Cache %s already cancelled replicas request for segment %s from %s with topology %s", StateReceiverImpl.this.cacheName, Integer.valueOf(this.segmentId), this.replicaHosts, this.topology);
                }
                return this.future;
            }
            if (StateReceiverImpl.log.isTraceEnabled()) {
                StateReceiverImpl.log.tracef("Cache %s attempting to receive replicas for segment %s from %s with topologyId=%s, timeout=%d", StateReceiverImpl.this.cacheName, Integer.valueOf(this.segmentId), this.replicaHosts, Integer.valueOf(this.topology.getTopologyId()), Long.valueOf(this.timeout));
            }
            this.future = new CompletableFuture<>();
            this.future.whenComplete((list, th) -> {
                if (th != null) {
                    if (StateReceiverImpl.log.isTraceEnabled()) {
                        StateReceiverImpl.log.tracef("Cache %s segment request(s) cancelled due to exception=%s", StateReceiverImpl.this.cacheName, th);
                    }
                    cancel(th);
                }
            });
            for (Address address : this.replicaHosts) {
                if (address.equals(StateReceiverImpl.this.rpcManager.getAddress())) {
                    StateReceiverImpl.this.dataContainer.forEach(internalCacheEntry -> {
                        if (this.topology.getDistribution(internalCacheEntry.getKey()).segmentId() == this.segmentId) {
                            addKeyToReplicaMap(address, internalCacheEntry);
                        }
                    });
                    if (this.replicaHosts.size() == 1) {
                        completeRequest();
                    }
                } else {
                    InboundTransferTask createTransferTask = StateReceiverImpl.this.createTransferTask(this.segmentId, address, this.topology, this.timeout);
                    this.transferTaskMap.put(address, createTransferTask);
                    StateReceiverImpl.this.stateReceiverExecutor.execute(() -> {
                        if (this.transferTaskMap.containsKey(address)) {
                            createTransferTask.requestSegments().exceptionally(th2 -> {
                                if (StateReceiverImpl.log.isTraceEnabled()) {
                                    StateReceiverImpl.log.tracef(th2, "Cache %s exception when processing InboundTransferTask", StateReceiverImpl.this.cacheName);
                                }
                                cancel(th2);
                                return null;
                            });
                        }
                    });
                }
            }
            return this.future;
        }

        synchronized void clear() {
            this.keyReplicaMap.clear();
            this.transferTaskMap.clear();
            StateReceiverImpl.this.requestMap.remove(Integer.valueOf(this.segmentId));
        }

        synchronized void receiveState(Address address, int i, Collection<StateChunk> collection) {
            if (i < this.topology.getTopologyId()) {
                if (StateReceiverImpl.log.isTraceEnabled()) {
                    StateReceiverImpl.log.tracef("Cache %s discarding state response with old topology id %d, the smallest allowed topology id is %d", i, this.topology.getTopologyId(), (Object) StateReceiverImpl.this.cacheName);
                    return;
                }
                return;
            }
            InboundTransferTask inboundTransferTask = this.transferTaskMap.get(address);
            if (inboundTransferTask == null) {
                if (StateReceiverImpl.log.isTraceEnabled()) {
                    StateReceiverImpl.log.tracef("Cache %s state received for an unknown request. No record of a state request exists for node %s", StateReceiverImpl.this.cacheName, address);
                    return;
                }
                return;
            }
            if (StateReceiverImpl.log.isTraceEnabled()) {
                StateReceiverImpl.log.tracef("Cache %s state chunks received from %s, with topologyId %s, statechunks %s", StateReceiverImpl.this.cacheName, address, Integer.valueOf(i), collection);
            }
            for (StateChunk stateChunk : collection) {
                boolean isLastChunk = stateChunk.isLastChunk();
                stateChunk.getCacheEntries().forEach(internalCacheEntry -> {
                    addKeyToReplicaMap(address, internalCacheEntry);
                });
                inboundTransferTask.onStateReceived(stateChunk.getSegmentId(), isLastChunk);
                if (isLastChunk) {
                    this.transferTaskMap.remove(address);
                    if (this.transferTaskMap.isEmpty()) {
                        completeRequest();
                    }
                }
            }
        }

        synchronized void cancel(Throwable th) {
            if (this.future.isDone()) {
                return;
            }
            StateReceiverImpl.log.debugf(th, "Cache %s cancelling request for segment %s", StateReceiverImpl.this.cacheName, Integer.valueOf(this.segmentId));
            if (this.future == null) {
                this.future = new CompletableFuture<>();
            }
            this.transferTaskMap.forEach((address, inboundTransferTask) -> {
                inboundTransferTask.cancel();
            });
            if (th != null) {
                this.future.completeExceptionally(th);
            } else {
                this.future.cancel(true);
            }
            clear();
        }

        synchronized void completeRequest() {
            ArrayList arrayList = new ArrayList(this.keyReplicaMap.values());
            clear();
            this.future.complete(Collections.unmodifiableList(arrayList));
        }

        void addKeyToReplicaMap(Address address, CacheEntry<K, V> cacheEntry) {
            this.keyReplicaMap.computeIfAbsent(cacheEntry.getKey(), obj -> {
                HashMap hashMap = new HashMap();
                this.replicaHosts.forEach(address2 -> {
                    hashMap.put(address2, NullCacheEntry.getInstance());
                });
                return hashMap;
            }).put(address, cacheEntry);
        }

        public String toString() {
            return "SegmentRequest{segmentId=" + this.segmentId + ", topology=" + this.topology.getTopologyId() + ", replicaHosts=" + this.replicaHosts + ", keyReplicaMap=" + this.keyReplicaMap + ", transferTaskMap=" + this.transferTaskMap + ", future=" + this.future + '}';
        }

        static {
            $assertionsDisabled = !StateReceiverImpl.class.desiredAssertionStatus();
        }
    }

    @Start
    public void start() {
        this.cacheNotifier.addListener(this);
        this.stateReceiverExecutor = new LimitedExecutor("StateReceiver-" + this.cacheName, this.nonBlockingExecutor, 1);
    }

    @Stop
    public void stop() {
        cancelRequests();
        this.stateReceiverExecutor.shutdownNow();
    }

    @Override // org.infinispan.conflict.impl.StateReceiver
    public void cancelRequests() {
        if (log.isTraceEnabled()) {
            log.tracef("Cache %s stop() called on StateReceiverImpl", this.cacheName);
        }
        Iterator<StateReceiverImpl<K, V>.SegmentRequest> it = this.requestMap.values().iterator();
        while (it.hasNext()) {
            it.next().cancel(null);
        }
    }

    @DataRehashed
    public void onDataRehash(DataRehashedEvent<K, V> dataRehashedEvent) {
        if (dataRehashedEvent.isPre()) {
            if (log.isTraceEnabled()) {
                log.tracef("Cache %s received event: %s", this.cacheName, dataRehashedEvent);
            }
            Iterator<StateReceiverImpl<K, V>.SegmentRequest> it = this.requestMap.values().iterator();
            while (it.hasNext()) {
                it.next().cancel(new CacheException("Cancelling replica request as the owners of the requested segment have changed."));
            }
        }
    }

    @Override // org.infinispan.conflict.impl.StateReceiver
    public CompletableFuture<List<Map<Address, CacheEntry<K, V>>>> getAllReplicasForSegment(int i, LocalizedCacheTopology localizedCacheTopology, long j) {
        return this.requestMap.computeIfAbsent(Integer.valueOf(i), num -> {
            return new SegmentRequest(num.intValue(), localizedCacheTopology, j);
        }).requestState();
    }

    @Override // org.infinispan.conflict.impl.StateReceiver
    public void receiveState(Address address, int i, Collection<StateChunk> collection) {
        if (collection.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.tracef("Cache %s ignoring received state from %s because stateChunks are empty", this.cacheName, address);
                return;
            }
            return;
        }
        StateReceiverImpl<K, V>.SegmentRequest segmentRequest = this.requestMap.get(Integer.valueOf(collection.iterator().next().getSegmentId()));
        if (segmentRequest == null) {
            if (log.isTraceEnabled()) {
                log.tracef("Cache %s ignoring received state because the associated request was completed or cancelled", this.cacheName);
            }
        } else {
            if (log.isTraceEnabled()) {
                log.tracef("Cache %s received state for %s", this.cacheName, segmentRequest);
            }
            segmentRequest.receiveState(address, i, collection);
        }
    }

    Map<K, Map<Address, CacheEntry<K, V>>> getKeyReplicaMap(int i) {
        return this.requestMap.get(Integer.valueOf(i)).keyReplicaMap;
    }

    Map<Address, InboundTransferTask> getTransferTaskMap(int i) {
        return this.requestMap.get(Integer.valueOf(i)).transferTaskMap;
    }

    InboundTransferTask createTransferTask(int i, Address address, CacheTopology cacheTopology, long j) {
        return new InboundTransferTask(IntSets.immutableSet(i), address, cacheTopology.getTopologyId(), this.rpcManager, this.commandsFactory, j, this.cacheName, false);
    }
}
