/*
 * Decompiled with CFR 0.152.
 */
package io.envoyproxy.controlplane.server;

import io.envoyproxy.controlplane.cache.DeltaResponse;
import io.envoyproxy.controlplane.cache.DeltaWatch;
import io.envoyproxy.controlplane.cache.DeltaXdsRequest;
import io.envoyproxy.controlplane.cache.Resources;
import io.envoyproxy.controlplane.cache.VersionedResource;
import io.envoyproxy.controlplane.server.DiscoveryServer;
import io.envoyproxy.controlplane.server.LatestDeltaDiscoveryResponse;
import io.envoyproxy.controlplane.server.exception.RequestException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DeltaDiscoveryRequestStreamObserver<V, X, Y>
implements StreamObserver<V> {
    private static final AtomicLongFieldUpdater<DeltaDiscoveryRequestStreamObserver> streamNonceUpdater = AtomicLongFieldUpdater.newUpdater(DeltaDiscoveryRequestStreamObserver.class, "streamNonce");
    private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
    final long streamId;
    private final String defaultTypeUrl;
    private final StreamObserver<X> responseObserver;
    private final Executor executor;
    volatile boolean hasClusterChanged;
    final DiscoveryServer<?, ?, V, X, Y> discoveryServer;
    private volatile long streamNonce;
    private volatile boolean isClosing;

    DeltaDiscoveryRequestStreamObserver(String defaultTypeUrl, StreamObserver<X> responseObserver, long streamId, Executor executor, DiscoveryServer<?, ?, V, X, Y> discoveryServer) {
        this.defaultTypeUrl = defaultTypeUrl;
        this.responseObserver = responseObserver;
        this.streamId = streamId;
        this.executor = executor;
        this.streamNonce = 0L;
        this.discoveryServer = discoveryServer;
        this.hasClusterChanged = false;
    }

    public void onNext(V rawRequest) {
        String requestTypeUrl;
        DeltaXdsRequest request = this.discoveryServer.wrapDeltaXdsRequest(rawRequest);
        String string = requestTypeUrl = request.getTypeUrl().isEmpty() ? this.defaultTypeUrl : request.getTypeUrl();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[{}] request {}[{}] with nonce {} from versions {}", new Object[]{this.streamId, requestTypeUrl, String.join((CharSequence)", ", request.getResourceNamesSubscribeList()), request.getResponseNonce(), request.getInitialResourceVersionsMap()});
        }
        try {
            this.discoveryServer.runStreamDeltaRequestCallbacks(this.streamId, rawRequest);
        }
        catch (RequestException e) {
            this.closeWithError((Throwable)((Object)e));
            return;
        }
        String version = this.latestVersion(requestTypeUrl) == null ? "" : this.latestVersion(requestTypeUrl);
        this.updateSubscriptions(requestTypeUrl, request.getResourceNamesSubscribeList(), request.getResourceNamesUnsubscribeList());
        if (!request.getResponseNonce().isEmpty()) {
            LatestDeltaDiscoveryResponse response = this.clearResponse(requestTypeUrl, request.getResponseNonce());
            if (!request.hasErrorDetail()) {
                this.updateTrackedResources(requestTypeUrl, response.resourceVersions(), response.removedResources());
            }
        }
        if (this.responseCount(requestTypeUrl) == 0) {
            this.computeWatch(requestTypeUrl, () -> this.discoveryServer.configWatcher.createDeltaWatch(request, version, this.resourceVersions(requestTypeUrl), this.pendingResources(requestTypeUrl), this.isWildcard(requestTypeUrl), r -> this.executor.execute(() -> this.send((DeltaResponse)r, requestTypeUrl)), this.hasClusterChanged));
        }
    }

    public void onError(Throwable t) {
        if (!Status.fromThrowable((Throwable)t).getCode().equals((Object)Status.CANCELLED.getCode())) {
            LOGGER.error("[{}] stream closed with error", (Object)this.streamId, (Object)t);
        }
        try {
            this.discoveryServer.callbacks.forEach(cb -> cb.onStreamCloseWithError(this.streamId, this.defaultTypeUrl, t));
            this.closeWithError((Throwable)Status.fromThrowable((Throwable)t).asException());
        }
        finally {
            this.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onCompleted() {
        LOGGER.debug("[{}] stream closed", (Object)this.streamId);
        try {
            this.discoveryServer.callbacks.forEach(cb -> cb.onStreamClose(this.streamId, this.defaultTypeUrl));
            StreamObserver<X> streamObserver = this.responseObserver;
            synchronized (streamObserver) {
                if (!this.isClosing) {
                    this.isClosing = true;
                    this.responseObserver.onCompleted();
                }
            }
        }
        finally {
            this.cancel();
        }
    }

    void onCancelled() {
        LOGGER.info("[{}] stream cancelled", (Object)this.streamId);
        this.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeWithError(Throwable exception) {
        StreamObserver<X> streamObserver = this.responseObserver;
        synchronized (streamObserver) {
            if (!this.isClosing) {
                this.isClosing = true;
                this.responseObserver.onError(exception);
            }
        }
        this.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send(DeltaResponse response, String typeUrl) {
        String nonce = Long.toString(streamNonceUpdater.getAndIncrement(this));
        List resources = response.resources().entrySet().stream().map(entry -> this.discoveryServer.makeResource((String)entry.getKey(), ((VersionedResource)entry.getValue()).version(), this.discoveryServer.protoResourcesSerializer.serialize(((VersionedResource)entry.getValue()).resource(), Resources.getResourceApiVersion((String)typeUrl)))).collect(Collectors.toList());
        X discoveryResponse = this.discoveryServer.makeDeltaResponse(typeUrl, response.version(), nonce, resources, response.removedResources());
        LOGGER.debug("[{}] response {} with nonce {} version {}", new Object[]{this.streamId, typeUrl, nonce, response.version()});
        this.discoveryServer.runStreamDeltaResponseCallbacks(this.streamId, response.request(), discoveryResponse);
        this.setResponse(typeUrl, nonce, LatestDeltaDiscoveryResponse.create(nonce, response.version(), response.resources().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((VersionedResource)entry.getValue()).version())), response.removedResources()));
        this.setLatestVersion(typeUrl, response.version());
        StreamObserver<X> streamObserver = this.responseObserver;
        synchronized (streamObserver) {
            block6: {
                if (!this.isClosing) {
                    try {
                        this.responseObserver.onNext(discoveryResponse);
                    }
                    catch (StatusRuntimeException e) {
                        if (Status.CANCELLED.getCode().equals((Object)e.getStatus().getCode())) break block6;
                        throw e;
                    }
                }
            }
        }
    }

    abstract void cancel();

    abstract boolean ads();

    abstract void setLatestVersion(String var1, String var2);

    abstract String latestVersion(String var1);

    abstract void setResponse(String var1, String var2, LatestDeltaDiscoveryResponse var3);

    abstract LatestDeltaDiscoveryResponse clearResponse(String var1, String var2);

    abstract int responseCount(String var1);

    abstract Map<String, String> resourceVersions(String var1);

    abstract Set<String> pendingResources(String var1);

    abstract boolean isWildcard(String var1);

    abstract void updateTrackedResources(String var1, Map<String, String> var2, List<String> var3);

    abstract void updateSubscriptions(String var1, List<String> var2, List<String> var3);

    abstract void computeWatch(String var1, Supplier<DeltaWatch> var2);
}

