/*
 * Decompiled with CFR 0.152.
 */
package momento.lock.client;

import com.amazonaws.services.dynamodbv2.LockItem;
import com.amazonaws.services.dynamodbv2.util.LockClientUtils;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import momento.lock.client.LockItemUtils;
import momento.lock.client.LockStorage;
import momento.lock.client.MomentoLockItem;
import momento.sdk.CacheClient;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.responses.cache.GetResponse;
import momento.sdk.responses.cache.ttl.ItemGetTtlResponse;
import momento.sdk.responses.cache.ttl.UpdateTtlResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MomentoLockClientHeartbeatHandler
implements Runnable,
Closeable {
    private static final Log logger = LogFactory.getLog(MomentoLockClientHeartbeatHandler.class);
    private final LockStorage lockStorage;
    private final CacheClient cacheClient;
    private final String cacheName;
    private final Duration leaseDuration;
    private boolean holdLockOnServiceUnavailable;
    private static final int TTL_GRACE_MILLIS = 200;
    private static final Duration MOMENTO_TIMEOUT = Duration.ofSeconds(10L);
    private final ThreadPoolExecutor heartbeatExecutor;
    int rounds = 0;

    public MomentoLockClientHeartbeatHandler(LockStorage lockStorage, CacheClient client, String cacheName, Duration leaseDuration, boolean holdLockOnServiceUnavailable, int totalNumBackgroundHeartbeatThreads) {
        this.lockStorage = lockStorage;
        this.cacheClient = client;
        this.cacheName = cacheName;
        this.leaseDuration = leaseDuration;
        this.holdLockOnServiceUnavailable = holdLockOnServiceUnavailable;
        this.heartbeatExecutor = new ThreadPoolExecutor(totalNumBackgroundHeartbeatThreads, totalNumBackgroundHeartbeatThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public void run() {
        logger.debug((Object)("Heartbeat run..." + ++this.rounds));
        List<LockItem> locks = this.lockStorage.getAllLocks();
        for (LockItem lock : locks) {
            this.heartbeatExecutor.submit(() -> this.heartBeat(lock, LockItemUtils.toMomentoLockItem(lock)));
        }
        logger.debug((Object)("Total locks heartbeated for " + locks.size()));
    }

    public void heartBeat(LockItem lockItem, MomentoLockItem momentoLockItem) {
        try {
            ItemGetTtlResponse itemGetTtlResponse = (ItemGetTtlResponse)this.cacheClient.itemGetTtl(this.cacheName, momentoLockItem.getCacheKey()).get(MOMENTO_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
            if (itemGetTtlResponse instanceof ItemGetTtlResponse.Hit && ((ItemGetTtlResponse.Hit)itemGetTtlResponse).remainingTtl().toMillis() > 200L) {
                GetResponse getResponse = (GetResponse)this.cacheClient.get(this.cacheName, momentoLockItem.getCacheKey()).get(MOMENTO_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                if (getResponse instanceof GetResponse.Hit) {
                    MomentoLockItem retrievedLockItem = LockItemUtils.deserialize(((GetResponse.Hit)getResponse).valueByteArray());
                    if (momentoLockItem.getOwner().equals(retrievedLockItem.getOwner())) {
                        UpdateTtlResponse updateTtlResponse = (UpdateTtlResponse)this.cacheClient.updateTtl(this.cacheName, momentoLockItem.getCacheKey(), this.leaseDuration).get(MOMENTO_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                        if (updateTtlResponse instanceof UpdateTtlResponse.Miss) {
                            logger.debug((Object)("Got a updateTtlResponse miss for cache key in round " + String.valueOf(this.rounds)));
                            this.lockStorage.removeLock(momentoLockItem.getCacheKey());
                        } else if (updateTtlResponse instanceof UpdateTtlResponse.Set) {
                            lockItem.updateLookUpTime(LockClientUtils.INSTANCE.millisecondTime());
                        } else if (updateTtlResponse instanceof UpdateTtlResponse.Error) {
                            if (this.holdLock(((UpdateTtlResponse.Error)updateTtlResponse).getErrorCode())) {
                                logger.warn((Object)"Service Unavailable. Holding the lock as holdLockOnServiceUnavailable is set to true.");
                                lockItem.updateLookUpTime(LockClientUtils.INSTANCE.millisecondTime());
                            } else {
                                this.lockStorage.removeLock(momentoLockItem.getCacheKey());
                            }
                        }
                    } else {
                        this.lockStorage.removeLock(momentoLockItem.getCacheKey());
                    }
                } else if (getResponse instanceof GetResponse.Miss) {
                    logger.debug((Object)("Got a GetResponse miss for cache key in round " + String.valueOf(this.rounds)));
                    this.lockStorage.removeLock(momentoLockItem.getCacheKey());
                } else if (getResponse instanceof GetResponse.Error) {
                    if (this.holdLock(((GetResponse.Error)getResponse).getErrorCode())) {
                        logger.warn((Object)("Service Unavailable. Holding the lock as holdLockOnServiceUnavailable is set to true. lockKey " + momentoLockItem.getCacheKey()));
                        lockItem.updateLookUpTime(LockClientUtils.INSTANCE.millisecondTime());
                    } else {
                        this.lockStorage.removeLock(momentoLockItem.getCacheKey());
                    }
                }
            } else if (itemGetTtlResponse instanceof ItemGetTtlResponse.Error) {
                if (this.holdLock(((ItemGetTtlResponse.Error)itemGetTtlResponse).getErrorCode())) {
                    logger.warn((Object)("Service Unavailable. Holding the lock as holdLockOnServiceUnavailable is set to true. lockKey " + momentoLockItem.getCacheKey()));
                    lockItem.updateLookUpTime(LockClientUtils.INSTANCE.millisecondTime());
                } else {
                    this.lockStorage.removeLock(momentoLockItem.getCacheKey());
                }
            } else {
                logger.debug((Object)("Got a itemGetTtlResponse miss for cache key " + momentoLockItem.getCacheKey() + " in round " + String.valueOf(this.rounds)));
                this.lockStorage.removeLock(momentoLockItem.getCacheKey());
            }
        }
        catch (ExecutionException | TimeoutException e) {
            logger.warn((Object)("Service Unavailable. Holding the lock as holdLockOnServiceUnavailable is set to true. lockKey " + momentoLockItem.getCacheKey()));
            lockItem.updateLookUpTime(LockClientUtils.INSTANCE.millisecondTime());
        }
        catch (InterruptedException e) {
            logger.warn((Object)("Heartbeat handler was interrupted. I will be explicitly closed by the lock client once a close is called on it. So I can swallow this as something  unexpected and continue with the next run of the handler. the lock with key " + momentoLockItem.getCacheKey() + " was not heartbeated."));
        }
    }

    private boolean holdLock(MomentoErrorCode errorCode) {
        if (!this.holdLockOnServiceUnavailable) {
            return false;
        }
        switch (errorCode) {
            case SERVER_UNAVAILABLE: 
            case INTERNAL_SERVER_ERROR: 
            case UNKNOWN_SERVICE_ERROR: {
                return true;
            }
        }
        return false;
    }

    @Override
    public void close() throws IOException {
        this.heartbeatExecutor.shutdown();
    }
}

