package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.AsyncProcessor;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.PartitionStatusChanged;
import org.infinispan.notifications.cachelistener.event.PartitionStatusChangedEvent;
import org.infinispan.partitionhandling.AvailabilityException;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:BOOT-INF/lib/infinispan-core-13.0.21.Final.jar:org/infinispan/reactive/publisher/impl/PartitionAwareClusterPublisherManager.class */
public class PartitionAwareClusterPublisherManager<K, V> extends ClusterPublisherManagerImpl<K, V> {

    @Inject
    protected ComponentRef<Cache<?, ?>> cache;
    volatile AvailabilityMode currentMode = AvailabilityMode.AVAILABLE;
    protected final PartitionAwareClusterPublisherManager<K, V>.PartitionListener listener = new PartitionListener();
    private final Set<CompletableFuture<?>> pendingCompletableFutures = ConcurrentHashMap.newKeySet();
    private final Set<FlowableProcessor<?>> pendingProcessors = ConcurrentHashMap.newKeySet();

    @Listener
    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-13.0.21.Final.jar:org/infinispan/reactive/publisher/impl/PartitionAwareClusterPublisherManager$PartitionListener.class */
    private class PartitionListener {
        volatile AvailabilityMode currentMode;

        private PartitionListener() {
            this.currentMode = AvailabilityMode.AVAILABLE;
        }

        @PartitionStatusChanged
        public void onPartitionChange(PartitionStatusChangedEvent<K, ?> partitionStatusChangedEvent) {
            if (partitionStatusChangedEvent.isPre()) {
                return;
            }
            AvailabilityMode availabilityMode = partitionStatusChangedEvent.getAvailabilityMode();
            if (availabilityMode == AvailabilityMode.DEGRADED_MODE) {
                AvailabilityException partitionDegraded = Log.CLUSTER.partitionDegraded();
                PartitionAwareClusterPublisherManager.this.pendingProcessors.forEach(flowableProcessor -> {
                    flowableProcessor.onError(partitionDegraded);
                });
                PartitionAwareClusterPublisherManager.this.pendingCompletableFutures.forEach(completableFuture -> {
                    completableFuture.completeExceptionally(partitionDegraded);
                });
            }
            this.currentMode = availabilityMode;
        }
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl
    public void start() {
        super.start();
        this.cache.running().addListener(this.listener);
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> CompletionStage<R> keyReduction(boolean z, IntSet intSet, Set<K> set, InvocationContext invocationContext, boolean z2, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        checkPartitionStatus();
        return registerStage(super.keyReduction(z, intSet, set, invocationContext, z2, deliveryGuarantee, function, function2));
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> CompletionStage<R> entryReduction(boolean z, IntSet intSet, Set<K> set, InvocationContext invocationContext, boolean z2, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        checkPartitionStatus();
        return registerStage(super.entryReduction(z, intSet, set, invocationContext, z2, deliveryGuarantee, function, function2));
    }

    private <R> CompletionStage<R> registerStage(CompletionStage<R> completionStage) {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        this.pendingCompletableFutures.add(completableFuture);
        if (isPartitionDegraded()) {
            this.pendingCompletableFutures.remove(completableFuture);
            completableFuture.completeExceptionally(Log.CLUSTER.partitionDegraded());
        } else {
            completionStage.whenComplete((obj, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    completableFuture.complete(obj);
                }
                this.pendingCompletableFutures.remove(completableFuture);
            });
        }
        return completableFuture;
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> SegmentCompletionPublisher<R> keyPublisher(IntSet intSet, Set<K> set, InvocationContext invocationContext, boolean z, DeliveryGuarantee deliveryGuarantee, int i, Function<? super Publisher<K>, ? extends Publisher<R>> function) {
        checkPartitionStatus();
        return registerPublisher(super.keyPublisher(intSet, set, invocationContext, z, deliveryGuarantee, i, function));
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> SegmentCompletionPublisher<R> entryPublisher(IntSet intSet, Set<K> set, InvocationContext invocationContext, boolean z, DeliveryGuarantee deliveryGuarantee, int i, Function<? super Publisher<CacheEntry<K, V>>, ? extends Publisher<R>> function) {
        checkPartitionStatus();
        return registerPublisher(super.entryPublisher(intSet, set, invocationContext, z, deliveryGuarantee, i, function));
    }

    private <R> SegmentCompletionPublisher<R> registerPublisher(SegmentCompletionPublisher<R> segmentCompletionPublisher) {
        return (subscriber, intConsumer) -> {
            FlowableProcessor<T> serialized = AsyncProcessor.create().toSerialized();
            this.pendingProcessors.add(serialized);
            if (isPartitionDegraded()) {
                this.pendingProcessors.remove(serialized);
                serialized.onError(Log.CLUSTER.partitionDegraded());
                segmentCompletionPublisher.subscribe(serialized);
            } else {
                Flowable fromPublisher = Flowable.fromPublisher(subscriber -> {
                    segmentCompletionPublisher.subscribe(subscriber, intConsumer);
                });
                Objects.requireNonNull(serialized);
                Flowable.merge(serialized.doFinally(() -> {
                    this.pendingProcessors.remove(serialized);
                }), fromPublisher.doFinally(serialized::onComplete)).subscribe(subscriber);
            }
        };
    }

    private void checkPartitionStatus() {
        if (isPartitionDegraded()) {
            throw Log.CLUSTER.partitionDegraded();
        }
    }

    private boolean isPartitionDegraded() {
        return this.currentMode != AvailabilityMode.AVAILABLE;
    }
}
