/*
 * Decompiled with CFR 0.152.
 */
package io.envoyproxy.controlplane.server;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import io.envoyproxy.controlplane.cache.ConfigWatcher;
import io.envoyproxy.controlplane.cache.Resources;
import io.envoyproxy.controlplane.cache.Response;
import io.envoyproxy.controlplane.cache.Watch;
import io.envoyproxy.controlplane.server.DefaultExecutorGroup;
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks;
import io.envoyproxy.controlplane.server.ExecutorGroup;
import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc;
import io.envoyproxy.envoy.api.v2.Discovery;
import io.envoyproxy.envoy.api.v2.EndpointDiscoveryServiceGrpc;
import io.envoyproxy.envoy.api.v2.ListenerDiscoveryServiceGrpc;
import io.envoyproxy.envoy.api.v2.RouteDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v2.SecretDiscoveryServiceGrpc;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DiscoveryServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
    static final String ANY_TYPE_URL = "";
    private final List<DiscoveryServerCallbacks> callbacks;
    private final ConfigWatcher configWatcher;
    private final ExecutorGroup executorGroup;
    private final AtomicLong streamCount = new AtomicLong();

    public DiscoveryServer(ConfigWatcher configWatcher) {
        this(Collections.emptyList(), configWatcher);
    }

    public DiscoveryServer(DiscoveryServerCallbacks callbacks, ConfigWatcher configWatcher) {
        this(Collections.singletonList(callbacks), configWatcher);
    }

    public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks, ConfigWatcher configWatcher) {
        this(callbacks, configWatcher, new DefaultExecutorGroup());
    }

    public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks, ConfigWatcher configWatcher, ExecutorGroup executorGroup) {
        Preconditions.checkNotNull(callbacks, (Object)"callbacks cannot be null");
        Preconditions.checkNotNull((Object)configWatcher, (Object)"configWatcher cannot be null");
        Preconditions.checkNotNull((Object)executorGroup, (Object)"executorGroup cannot be null");
        this.callbacks = callbacks;
        this.configWatcher = configWatcher;
        this.executorGroup = executorGroup;
    }

    public AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase getAggregatedDiscoveryServiceImpl() {
        return new AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase(){

            public StreamObserver<Discovery.DiscoveryRequest> streamAggregatedResources(StreamObserver<Discovery.DiscoveryResponse> responseObserver) {
                return DiscoveryServer.this.createRequestHandler((StreamObserver<Discovery.DiscoveryResponse>)responseObserver, true, DiscoveryServer.ANY_TYPE_URL);
            }
        };
    }

    public ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceImplBase getClusterDiscoveryServiceImpl() {
        return new ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceImplBase(){

            public StreamObserver<Discovery.DiscoveryRequest> streamClusters(StreamObserver<Discovery.DiscoveryResponse> responseObserver) {
                return DiscoveryServer.this.createRequestHandler((StreamObserver<Discovery.DiscoveryResponse>)responseObserver, false, "type.googleapis.com/envoy.api.v2.Cluster");
            }
        };
    }

    public EndpointDiscoveryServiceGrpc.EndpointDiscoveryServiceImplBase getEndpointDiscoveryServiceImpl() {
        return new EndpointDiscoveryServiceGrpc.EndpointDiscoveryServiceImplBase(){

            public StreamObserver<Discovery.DiscoveryRequest> streamEndpoints(StreamObserver<Discovery.DiscoveryResponse> responseObserver) {
                return DiscoveryServer.this.createRequestHandler((StreamObserver<Discovery.DiscoveryResponse>)responseObserver, false, "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment");
            }
        };
    }

    public ListenerDiscoveryServiceGrpc.ListenerDiscoveryServiceImplBase getListenerDiscoveryServiceImpl() {
        return new ListenerDiscoveryServiceGrpc.ListenerDiscoveryServiceImplBase(){

            public StreamObserver<Discovery.DiscoveryRequest> streamListeners(StreamObserver<Discovery.DiscoveryResponse> responseObserver) {
                return DiscoveryServer.this.createRequestHandler((StreamObserver<Discovery.DiscoveryResponse>)responseObserver, false, "type.googleapis.com/envoy.api.v2.Listener");
            }
        };
    }

    public RouteDiscoveryServiceGrpc.RouteDiscoveryServiceImplBase getRouteDiscoveryServiceImpl() {
        return new RouteDiscoveryServiceGrpc.RouteDiscoveryServiceImplBase(){

            public StreamObserver<Discovery.DiscoveryRequest> streamRoutes(StreamObserver<Discovery.DiscoveryResponse> responseObserver) {
                return DiscoveryServer.this.createRequestHandler((StreamObserver<Discovery.DiscoveryResponse>)responseObserver, false, "type.googleapis.com/envoy.api.v2.RouteConfiguration");
            }
        };
    }

    public SecretDiscoveryServiceGrpc.SecretDiscoveryServiceImplBase getSecretDiscoveryServiceImpl() {
        return new SecretDiscoveryServiceGrpc.SecretDiscoveryServiceImplBase(){

            public StreamObserver<Discovery.DiscoveryRequest> streamSecrets(StreamObserver<Discovery.DiscoveryResponse> responseObserver) {
                return DiscoveryServer.this.createRequestHandler((StreamObserver<Discovery.DiscoveryResponse>)responseObserver, false, "type.googleapis.com/envoy.api.v2.auth.Secret");
            }
        };
    }

    private StreamObserver<Discovery.DiscoveryRequest> createRequestHandler(StreamObserver<Discovery.DiscoveryResponse> responseObserver, boolean ads, String defaultTypeUrl) {
        long streamId = this.streamCount.getAndIncrement();
        Executor executor = this.executorGroup.next();
        LOGGER.info("[{}] open stream from {}", (Object)streamId, (Object)defaultTypeUrl);
        this.callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl));
        DiscoveryRequestStreamObserver requestStreamObserver = new DiscoveryRequestStreamObserver(defaultTypeUrl, responseObserver, streamId, ads, executor);
        if (responseObserver instanceof ServerCallStreamObserver) {
            ((ServerCallStreamObserver)responseObserver).setOnCancelHandler(requestStreamObserver::onCancelled);
        }
        return requestStreamObserver;
    }

    private class DiscoveryRequestStreamObserver
    implements StreamObserver<Discovery.DiscoveryRequest> {
        private final Map<String, Watch> watches;
        private final Map<String, Discovery.DiscoveryResponse> latestResponse;
        private final Map<String, Set<String>> ackedResources;
        private final String defaultTypeUrl;
        private final StreamObserver<Discovery.DiscoveryResponse> responseObserver;
        private final long streamId;
        private final boolean ads;
        private final Executor executor;
        private AtomicLong streamNonce;

        public DiscoveryRequestStreamObserver(String defaultTypeUrl, StreamObserver<Discovery.DiscoveryResponse> responseObserver, long streamId, boolean ads, Executor executor) {
            this.defaultTypeUrl = defaultTypeUrl;
            this.responseObserver = responseObserver;
            this.streamId = streamId;
            this.ads = ads;
            this.watches = new ConcurrentHashMap<String, Watch>(Resources.TYPE_URLS.size());
            this.latestResponse = new ConcurrentHashMap<String, Discovery.DiscoveryResponse>(Resources.TYPE_URLS.size());
            this.ackedResources = new ConcurrentHashMap<String, Set<String>>(Resources.TYPE_URLS.size());
            this.streamNonce = new AtomicLong();
            this.executor = executor;
        }

        public void onNext(Discovery.DiscoveryRequest request) {
            String nonce = request.getResponseNonce();
            String requestTypeUrl = request.getTypeUrl();
            if (this.defaultTypeUrl.equals(DiscoveryServer.ANY_TYPE_URL)) {
                if (requestTypeUrl.isEmpty()) {
                    this.responseObserver.onError((Throwable)Status.UNKNOWN.withDescription(String.format("[%d] type URL is required for ADS", this.streamId)).asRuntimeException());
                    return;
                }
            } else if (requestTypeUrl.isEmpty()) {
                requestTypeUrl = this.defaultTypeUrl;
            }
            LOGGER.info("[{}] request {}[{}] with nonce {} from version {}", new Object[]{this.streamId, requestTypeUrl, String.join((CharSequence)", ", (Iterable<? extends CharSequence>)request.getResourceNamesList()), nonce, request.getVersionInfo()});
            DiscoveryServer.this.callbacks.forEach(cb -> cb.onStreamRequest(this.streamId, request));
            for (String typeUrl : Resources.TYPE_URLS) {
                String resourceNonce;
                Discovery.DiscoveryResponse response = this.latestResponse.get(typeUrl);
                String string = resourceNonce = response == null ? null : response.getNonce();
                if (!requestTypeUrl.equals(typeUrl) || !Strings.isNullOrEmpty((String)resourceNonce) && !resourceNonce.equals(nonce)) continue;
                if (!request.hasErrorDetail() && response != null) {
                    Set ackedResourcesForType = response.getResourcesList().stream().map(Resources::getResourceName).collect(Collectors.toSet());
                    this.ackedResources.put(typeUrl, ackedResourcesForType);
                }
                this.watches.compute(typeUrl, (t, oldWatch) -> {
                    if (oldWatch != null) {
                        oldWatch.cancel();
                    }
                    return DiscoveryServer.this.configWatcher.createWatch(this.ads, request, this.ackedResources.getOrDefault(typeUrl, Collections.emptySet()), r -> this.executor.execute(() -> this.send((Response)r, typeUrl)));
                });
                return;
            }
        }

        public void onError(Throwable t) {
            if (!Status.fromThrowable((Throwable)t).getCode().equals((Object)Status.CANCELLED.getCode())) {
                LOGGER.error("[{}] stream closed with error", (Object)this.streamId, (Object)t);
            }
            try {
                DiscoveryServer.this.callbacks.forEach(cb -> cb.onStreamCloseWithError(this.streamId, this.defaultTypeUrl, t));
                this.responseObserver.onError((Throwable)Status.fromThrowable((Throwable)t).asException());
            }
            finally {
                this.cancel();
            }
        }

        public void onCompleted() {
            LOGGER.info("[{}] stream closed", (Object)this.streamId);
            try {
                DiscoveryServer.this.callbacks.forEach(cb -> cb.onStreamClose(this.streamId, this.defaultTypeUrl));
                this.responseObserver.onCompleted();
            }
            finally {
                this.cancel();
            }
        }

        void onCancelled() {
            LOGGER.info("[{}] stream cancelled", (Object)this.streamId);
            this.cancel();
        }

        private void cancel() {
            this.watches.values().forEach(Watch::cancel);
        }

        private void send(Response response, String typeUrl) {
            block2: {
                String nonce = Long.toString(this.streamNonce.getAndIncrement());
                Discovery.DiscoveryResponse discoveryResponse = Discovery.DiscoveryResponse.newBuilder().setVersionInfo(response.version()).addAllResources((Iterable)response.resources().stream().map(Any::pack).collect(Collectors.toList())).setTypeUrl(typeUrl).setNonce(nonce).build();
                LOGGER.info("[{}] response {} with nonce {} version {}", new Object[]{this.streamId, typeUrl, nonce, response.version()});
                DiscoveryServer.this.callbacks.forEach(cb -> cb.onStreamResponse(this.streamId, response.request(), discoveryResponse));
                this.latestResponse.put(typeUrl, discoveryResponse);
                try {
                    this.responseObserver.onNext((Object)discoveryResponse);
                }
                catch (StatusRuntimeException e) {
                    if (Status.CANCELLED.getCode().equals((Object)e.getStatus().getCode())) break block2;
                    throw e;
                }
            }
        }
    }
}

