/*
 * Decompiled with CFR 0.152.
 */
package io.envoyproxy.controlplane.server.callback;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.envoyproxy.controlplane.cache.NodeGroup;
import io.envoyproxy.controlplane.cache.Snapshot;
import io.envoyproxy.controlplane.cache.SnapshotCache;
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class SnapshotCollectingCallback<T, X extends Snapshot>
implements DiscoveryServerCallbacks {
    private final SnapshotCache<T, X> snapshotCache;
    private final NodeGroup<T> nodeGroup;
    private final Clock clock;
    private final Set<Consumer<T>> collectorCallbacks;
    private final long collectAfterMillis;
    private final Map<T, SnapshotState> snapshotStates = new ConcurrentHashMap<T, SnapshotState>();
    private final Map<Long, T> groupByStream = new ConcurrentHashMap<Long, T>();

    public SnapshotCollectingCallback(SnapshotCache<T, X> snapshotCache, NodeGroup<T> nodeGroup, Clock clock, Set<Consumer<T>> collectorCallbacks, long collectAfterMillis, long collectionIntervalMillis) {
        this.snapshotCache = snapshotCache;
        this.nodeGroup = nodeGroup;
        this.clock = clock;
        this.collectorCallbacks = collectorCallbacks;
        this.collectAfterMillis = collectAfterMillis;
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("snapshot-gc-%d").build());
        executorService.scheduleAtFixedRate(() -> this.deleteUnreferenced(clock), collectionIntervalMillis, collectionIntervalMillis, TimeUnit.MILLISECONDS);
    }

    @Override
    public synchronized void onV2StreamRequest(long streamId, DiscoveryRequest request) {
        Object groupIdentifier = this.nodeGroup.hash(request.getNode());
        this.updateState(streamId, groupIdentifier);
    }

    @Override
    public synchronized void onV3StreamRequest(long streamId, io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest request) {
        Object groupIdentifier = this.nodeGroup.hash(request.getNode());
        this.updateState(streamId, groupIdentifier);
    }

    private void updateState(long streamId, T groupIdentifier) {
        SnapshotState snapshotState = this.snapshotStates.computeIfAbsent(groupIdentifier, x -> new SnapshotState());
        snapshotState.lastSeen = this.clock.instant();
        if (this.groupByStream.put(streamId, groupIdentifier) == null) {
            ++snapshotState.streamCount;
        }
    }

    @Override
    public void onStreamClose(long streamId, String typeUrl) {
        this.onStreamCloseHelper(streamId);
    }

    @Override
    public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) {
        this.onStreamCloseHelper(streamId);
    }

    @VisibleForTesting
    synchronized void deleteUnreferenced(Clock clock) {
        LinkedHashSet<T> toDelete = new LinkedHashSet<T>();
        for (Map.Entry<T, SnapshotState> entry : this.snapshotStates.entrySet()) {
            T groupIdentifier;
            if (entry.getValue().streamCount != 0 || !entry.getValue().lastSeen.isBefore(clock.instant().minus(this.collectAfterMillis, ChronoUnit.MILLIS)) || !this.snapshotCache.clearSnapshot(groupIdentifier = entry.getKey())) continue;
            toDelete.add(groupIdentifier);
        }
        toDelete.forEach(group -> {
            this.snapshotStates.remove(group);
            this.collectorCallbacks.forEach(cb -> cb.accept(group));
        });
    }

    private synchronized void onStreamCloseHelper(long streamId) {
        T removed = this.groupByStream.remove(streamId);
        if (removed == null) {
            return;
        }
        SnapshotState snapshotState = this.snapshotStates.get(removed);
        --snapshotState.streamCount;
        snapshotState.lastSeen = this.clock.instant();
    }

    private static class SnapshotState {
        int streamCount;
        Instant lastSeen;

        private SnapshotState() {
        }
    }
}

