/*
 * Decompiled with CFR 0.152.
 */
package won.utils.batch;

import com.google.common.base.Charsets;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.Funnels;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class BatchingConsumer<K, I> {
    private final Map<K, Batch<K, I>> batches = new HashMap<K, Batch<K, I>>(10);
    ScheduledExecutorService executorSvc = Executors.newSingleThreadScheduledExecutor();
    private Config defaultConfig = new ConfigBuilder().maxBatchAge(Duration.ofHours(1L)).maxItemInterval(Duration.ofMinutes(5L)).consumeFirst(false).maxBatchSize(200).minChunkInterval(Duration.ofMinutes(15L)).build();

    public BatchingConsumer() {
    }

    public BatchingConsumer(Config config) {
        this.defaultConfig = config;
    }

    public void accept(K key, I item, Consumer<Collection<I>> consumer) {
        this.accept(key, item, Optional.empty(), consumer, Optional.empty());
    }

    public void accept(K key, I item, Consumer<Collection<I>> consumer, Config config) {
        this.accept(key, item, Optional.empty(), consumer, Optional.ofNullable(config));
    }

    public void accept(K key, I item, String deduplicationKey, Consumer<Collection<I>> consumer) {
        this.accept(key, item, Optional.ofNullable(deduplicationKey), consumer, Optional.empty());
    }

    public void accept(K key, I item, String deduplicationKey, Consumer<Collection<I>> consumer, Config config) {
        this.accept(key, item, Optional.ofNullable(deduplicationKey), consumer, Optional.ofNullable(config));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void accept(K key, I item, Optional<String> deduplicationKey, Consumer<Collection<I>> consumer, Optional<Config> config) {
        Batch<K, I> batch = null;
        Map<K, Batch<K, I>> map = this.batches;
        synchronized (map) {
            batch = this.batches.get(key);
            if (batch == null || batch.isShuttingDown()) {
                batch = new Batch<K, I>(key, consumer, config.orElse(this.defaultConfig));
                ((Batch)batch).scheduleCleanup();
                this.batches.put(key, batch);
            }
        }
        batch.add(item, deduplicationKey);
        batch.rescheduleChunkConsumption();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void consumeAllBatches() {
        Map<K, Batch<K, I>> map = this.batches;
        synchronized (map) {
            this.batches.values().forEach(batch -> batch.consumeAll(true));
            this.batches.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelBatch(K key) {
        Map<K, Batch<K, I>> map = this.batches;
        synchronized (map) {
            Batch<K, I> batch = this.batches.get(key);
            if (batch != null) {
                batch.cancelAndCleanup();
            }
        }
    }

    private class Batch<K, I> {
        private final K key;
        private final Queue<I> items = new LinkedList<I>();
        private final Config config;
        private Optional<Instant> lastChunkInstant = Optional.empty();
        private Consumer<Collection<I>> consumer;
        private Optional<ScheduledFuture<?>> cleanupTask = Optional.empty();
        private Optional<ScheduledFuture<?>> consumeChunkTask = Optional.empty();
        private final AtomicBoolean firstAdd = new AtomicBoolean(true);
        private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
        private final Object monitor = new Object();
        private final BloomFilter<CharSequence> duplicateFilter = BloomFilter.create((Funnel)Funnels.stringFunnel((Charset)Charsets.UTF_8), (int)100, (double)0.01);

        Batch(K key, Consumer<Collection<I>> consumer, Config config) {
            this.key = key;
            this.config = config;
            this.consumer = consumer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void add(I item, Optional<String> deduplicationKey) {
            Object object = this.monitor;
            synchronized (object) {
                if (deduplicationKey.isPresent()) {
                    if (this.duplicateFilter.mightContain((Object)deduplicationKey.get())) {
                        return;
                    }
                    this.duplicateFilter.put((Object)deduplicationKey.get());
                }
                this.lastChunkInstant = Optional.of(Instant.now());
                if (this.config.consumeFirst.orElse(false).booleanValue() && this.firstAdd.compareAndSet(true, false)) {
                    this.consume(Stream.of(item).collect(Collectors.toList()), false);
                } else {
                    this.items.add(item);
                    if (this.config.maxBatchSize.isPresent() && this.items.size() >= this.config.maxBatchSize.get()) {
                        this.consumeAll(false);
                    }
                }
            }
        }

        boolean isShuttingDown() {
            return this.shuttingDown.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void consumeAllAndCleanup() {
            Object object = this.monitor;
            synchronized (object) {
                this.shuttingDown.set(true);
                this.consumeAll(false);
                Map map = BatchingConsumer.this.batches;
                synchronized (map) {
                    BatchingConsumer.this.batches.remove(this.key);
                }
                this.cancelTask(this.consumeChunkTask);
                this.cancelTask(this.cleanupTask);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void cancelAndCleanup() {
            Object object = this.monitor;
            synchronized (object) {
                this.shuttingDown.set(true);
                Map map = BatchingConsumer.this.batches;
                synchronized (map) {
                    BatchingConsumer.this.batches.remove(this.key);
                }
                this.cancelTask(this.consumeChunkTask);
                this.cancelTask(this.cleanupTask);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void cancelTask(Optional<ScheduledFuture<?>> task) {
            Object object = this.monitor;
            synchronized (object) {
                if (task.isPresent()) {
                    task.get().cancel(false);
                }
            }
        }

        void consumeAll(boolean synchronous) {
            this.consume(this.items, synchronous);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean consumeChunk() {
            Object object = this.monitor;
            synchronized (object) {
                if (this.isTooEarlyForChunk()) {
                    return false;
                }
                this.consumeAll(false);
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void consume(Collection<I> itemsToConsume, boolean synchronous) {
            Object object = this.monitor;
            synchronized (object) {
                this.lastChunkInstant = Optional.of(Instant.now());
                ArrayList<I> consumed = new ArrayList<I>(itemsToConsume.size());
                consumed.addAll(itemsToConsume);
                itemsToConsume.clear();
                if (synchronous) {
                    this.consumer.accept(consumed);
                } else {
                    BatchingConsumer.this.executorSvc.execute(() -> this.consumer.accept(consumed));
                }
            }
        }

        boolean isTooEarlyForChunk() {
            if (this.lastChunkInstant.isPresent() && this.config.minChunkInterval.isPresent()) {
                return Duration.between(this.lastChunkInstant.get(), Instant.now()).compareTo(this.config.minChunkInterval.get()) < 0;
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void rescheduleChunkConsumption() {
            if (!this.config.maxItemInterval.isPresent()) {
                return;
            }
            Object object = this.monitor;
            synchronized (object) {
                this.cancelTask(this.consumeChunkTask);
                this.consumeChunkTask = Optional.of(BatchingConsumer.this.executorSvc.schedule(() -> {
                    boolean consumed = this.consumeChunk();
                    if (!consumed) {
                        this.rescheduleChunkConsumption();
                    }
                }, this.config.maxItemInterval.get().toNanos(), TimeUnit.NANOSECONDS));
            }
        }

        private void scheduleCleanup() {
            if (!this.config.maxBatchAge.isPresent()) {
                return;
            }
            this.cleanupTask = Optional.of(BatchingConsumer.this.executorSvc.schedule(() -> this.consumeAllAndCleanup(), this.config.maxBatchAge.get().toNanos(), TimeUnit.NANOSECONDS));
        }
    }

    public static class ConfigBuilder {
        private Optional<Duration> maxBatchAge = Optional.empty();
        private Optional<Duration> maxItemInterval = Optional.empty();
        private Optional<Boolean> consumeFirst = Optional.empty();
        private Optional<Duration> minChunkInterval = Optional.empty();
        private Optional<Integer> maxBatchSize = Optional.empty();

        public ConfigBuilder maxBatchAge(Duration d) {
            this.maxBatchAge = Optional.ofNullable(d);
            return this;
        }

        public ConfigBuilder maxItemInterval(Duration d) {
            this.maxItemInterval = Optional.ofNullable(d);
            return this;
        }

        public ConfigBuilder consumeFirst(Boolean b) {
            this.consumeFirst = Optional.ofNullable(b);
            return this;
        }

        public ConfigBuilder minChunkInterval(Duration d) {
            this.minChunkInterval = Optional.ofNullable(d);
            return this;
        }

        public ConfigBuilder maxBatchSize(Integer s) {
            this.maxBatchSize = Optional.ofNullable(s);
            return this;
        }

        public Config build() {
            return new Config(this.maxBatchAge, this.minChunkInterval, this.maxItemInterval, this.consumeFirst, this.maxBatchSize);
        }
    }

    public static class Config {
        public final Optional<Duration> maxBatchAge;
        public final Optional<Duration> maxItemInterval;
        public final Optional<Boolean> consumeFirst;
        public final Optional<Duration> minChunkInterval;
        public final Optional<Integer> maxBatchSize;

        public Config(Optional<Duration> maxBatchAge, Optional<Duration> minChunkInterval, Optional<Duration> maxItemInterval, Optional<Boolean> consumeFirst, Optional<Integer> maxBatchSize) {
            this.consumeFirst = consumeFirst;
            this.maxBatchAge = maxBatchAge;
            this.minChunkInterval = minChunkInterval;
            this.maxItemInterval = maxItemInterval;
            this.maxBatchSize = maxBatchSize;
        }
    }
}

