package org.infinispan.xsite.statetransfer;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Predicate;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.irac.IracManager;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:BOOT-INF/lib/infinispan-core-13.0.21.Final.jar:org/infinispan/xsite/statetransfer/XSiteStateProviderImpl.class */
public class XSiteStateProviderImpl implements XSiteStateProvider {
    private static final Log log;
    private static final Predicate<InternalCacheEntry<Object, Object>> NOT_L1_ENTRY;

    @Inject
    InternalDataContainer<Object, Object> dataContainer;

    @Inject
    PersistenceManager persistenceManager;

    @Inject
    ClusteringDependentLogic clusteringDependentLogic;

    @Inject
    CommandsFactory commandsFactory;

    @Inject
    RpcManager rpcManager;

    @Inject
    ComponentRef<XSiteStateTransferManager> stateTransferManager;

    @Inject
    IracManager iracManager;

    @Inject
    StateTransferLock stateTransferLock;

    @ComponentName(KnownComponentNames.NON_BLOCKING_EXECUTOR)
    @Inject
    ExecutorService nonBlockingExecutor;

    @ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR)
    @Inject
    ScheduledExecutorService timeoutExecutor;
    private final ConcurrentMap<String, XSiteStateProviderState> sites = new ConcurrentHashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    public XSiteStateProviderImpl(Configuration configuration) {
        for (BackupConfiguration backupConfiguration : configuration.sites().allBackups()) {
            this.sites.put(backupConfiguration.site(), XSiteStateProviderState.fromBackupConfiguration(backupConfiguration));
        }
    }

    @Start
    public void start() {
        this.sites.remove(this.rpcManager.getTransport().localSiteName());
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public void startStateTransfer(String str, Address address, int i) {
        XSiteStateProviderState xSiteStateProviderState = this.sites.get(str);
        if (!$assertionsDisabled && xSiteStateProviderState == null) {
            throw new AssertionError();
        }
        XSiteStatePushTask createPushTask = xSiteStateProviderState.createPushTask(address, this);
        if (createPushTask == null) {
            if (log.isDebugEnabled()) {
                log.debugf("Do not start state transfer to site '%s'. It has already started!", str);
            }
            checkCoordinatorAlive(str, address);
        } else {
            if (log.isDebugEnabled()) {
                log.debugf("Starting state transfer to site '%s'", str);
            }
            IntSet localPrimarySegments = localPrimarySegments();
            createPushTask.execute(Flowable.concat(publishDataContainerEntries(localPrimarySegments), publishStoreEntries(localPrimarySegments, xSiteStateProviderState.isSync())), this.stateTransferLock.topologyFuture(i));
            checkCoordinatorAlive(str, address);
        }
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public void cancelStateTransfer(String str) {
        XSiteStateProviderState xSiteStateProviderState = this.sites.get(str);
        if (!$assertionsDisabled && xSiteStateProviderState == null) {
            throw new AssertionError();
        }
        xSiteStateProviderState.cancelTransfer();
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public Collection<String> getCurrentStateSending() {
        return (Collection) this.sites.entrySet().stream().filter(entry -> {
            return ((XSiteStateProviderState) entry.getValue()).isSending();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public Collection<String> getSitesMissingCoordinator(Collection<Address> collection) {
        return (Collection) this.sites.entrySet().stream().filter(entry -> {
            return ((XSiteStateProviderState) entry.getValue()).isOriginatorMissing(collection);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public void notifyStateTransferEnd(String str, Address address, boolean z) {
        if (log.isDebugEnabled()) {
            log.debugf("Finished state transfer to site '%s'. Ok? %s", str, Boolean.valueOf(z));
        }
        if (this.rpcManager.getAddress().equals(address)) {
            this.stateTransferManager.running().notifyStatePushFinished(str, address, z);
        } else {
            this.rpcManager.sendTo(address, this.commandsFactory.buildXSiteStateTransferFinishSendCommand(str, z), DeliverOrder.NONE);
        }
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public CommandsFactory getCommandsFactory() {
        return this.commandsFactory;
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public RpcManager getRpcManager() {
        return this.rpcManager;
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public IracManager getIracManager() {
        return this.iracManager;
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public ScheduledExecutorService getScheduledExecutorService() {
        return this.timeoutExecutor;
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateProvider
    public Executor getExecutor() {
        return this.nonBlockingExecutor;
    }

    private void checkCoordinatorAlive(String str, Address address) {
        if (!this.rpcManager.getAddress().equals(this.rpcManager.getMembers().get(0)) || this.rpcManager.getMembers().contains(address)) {
            return;
        }
        this.stateTransferManager.running().becomeCoordinator(str);
    }

    private IntSet localPrimarySegments() {
        return IntSets.from(this.clusteringDependentLogic.getCacheTopology().getWriteConsistentHash().getPrimarySegmentsForOwner(this.rpcManager.getAddress()));
    }

    private Flowable<XSiteState> publishDataContainerEntries(IntSet intSet) {
        return Flowable.fromIterable(() -> {
            return this.dataContainer.iterator(intSet);
        }).filter(NOT_L1_ENTRY).map(XSiteState::fromDataContainer);
    }

    private Flowable<XSiteState> publishStoreEntries(IntSet intSet, boolean z) {
        return Flowable.fromPublisher(this.persistenceManager.publishEntries(intSet, this::missingInDataContainer, z, z, Configurations::isStateTransferStore)).map(XSiteState::fromCacheLoader);
    }

    private boolean missingInDataContainer(Object obj) {
        return this.dataContainer.peek(obj) == null;
    }

    static {
        $assertionsDisabled = !XSiteStateProviderImpl.class.desiredAssertionStatus();
        log = LogFactory.getLog(XSiteStateProviderImpl.class);
        NOT_L1_ENTRY = internalCacheEntry -> {
            return !internalCacheEntry.isL1Entry();
        };
    }
}
