/*
 * Decompiled with CFR 0.152.
 */
package io.ryos.rhino.sdk.runners;

import java.time.Duration;
import java.util.Arrays;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;

public class Throttler {
    private static final Logger LOG = LoggerFactory.getLogger(Throttler.class);

    public static <T> Function<Flux<T>, Flux<T>> throttle(Limit ... rps) {
        Flux reduce = Arrays.stream(rps).map(r -> {
            LOG.debug("throttle: tickNano={}, duration={}", (Object)r.tickNano, (Object)r.durationSec);
            return Flux.interval((Duration)Duration.ofNanos(r.tickNano)).take(Duration.ofSeconds(r.durationSec)).onBackpressureDrop();
        }).reduce(Flux::concatWith).orElse(Flux.generate(sink -> sink.next((Object)0L)));
        UnaryOperator res = f -> f.zipWith((Publisher)reduce.concatWith((Publisher)Flux.generate(sink -> sink.next((Object)0L)))).map(Tuple2::getT1);
        return res;
    }

    public static class Limit {
        final long durationSec;
        final long tickNano;

        private Limit(long requestsPerSecond, long durationSec) {
            this.durationSec = durationSec;
            this.tickNano = (long)Math.floor(1.0 / (double)requestsPerSecond * 1000.0 * 1000000.0);
        }

        public static Limit of(long requestsPerSecond, Duration duration) {
            return new Limit(requestsPerSecond, duration.toSeconds());
        }
    }
}

