/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.Util;
import io.opencmw.Filter;
import io.opencmw.HistoryEventHandler;
import io.opencmw.RingBufferEvent;
import io.opencmw.utils.Cache;
import io.opencmw.utils.LimitedArrayList;
import io.opencmw.utils.NoDuplicatesList;
import io.opencmw.utils.WorkerThreadFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventStore.class);
    private static final String NOT_FOUND_FOR_MULTIPLEXING_CONTEXT = "disruptor not found for multiplexing context = ";
    protected final WorkerThreadFactory threadFactory;
    protected final List<LocalEventHandlerGroup> listener = new NoDuplicatesList<LocalEventHandlerGroup>();
    protected final List<EventHandler<RingBufferEvent>> allEventHandlers = new NoDuplicatesList<EventHandler<RingBufferEvent>>();
    protected final List<Function<RingBufferEvent, String>> muxCtxFunctions = new NoDuplicatesList<Function<RingBufferEvent, String>>();
    protected final Cache<String, Disruptor<RingBufferEvent>> eventStreams;
    protected final Disruptor<RingBufferEvent> disruptor;
    protected final int lengthHistoryBuffer;
    protected Function<String, Disruptor<RingBufferEvent>> ctxMappingFunction;

    @SafeVarargs
    protected EventStore(Cache.CacheBuilder<String, Disruptor<RingBufferEvent>> muxBuilder, Function<RingBufferEvent, String> muxCtxFunction, int ringBufferSize, int lengthHistoryBuffer, int maxThreadNumber, boolean isSingleProducer, WaitStrategy waitStrategy, Class<? extends Filter> ... filterConfig) {
        assert (filterConfig != null);
        if (muxCtxFunction != null) {
            this.muxCtxFunctions.add(muxCtxFunction);
        }
        this.lengthHistoryBuffer = lengthHistoryBuffer;
        this.threadFactory = new WorkerThreadFactory(EventStore.class.getSimpleName() + "Worker", maxThreadNumber);
        this.disruptor = new Disruptor(() -> new RingBufferEvent(filterConfig), ringBufferSize, (ThreadFactory)this.threadFactory, isSingleProducer ? ProducerType.SINGLE : ProducerType.MULTI, waitStrategy);
        BiConsumer<String, Disruptor> clearCacheElement = (muxCtx, d) -> {
            d.shutdown();
            RingBuffer rb = d.getRingBuffer();
            for (long i = rb.getMinimumGatingSequence(); i < rb.getCursor(); ++i) {
                ((RingBufferEvent)rb.get(i)).clear();
            }
        };
        this.eventStreams = muxBuilder == null ? Cache.builder().withPostListener(clearCacheElement).build() : muxBuilder.build();
        this.ctxMappingFunction = ctx -> {
            Disruptor ld = new Disruptor(() -> new RingBufferEvent(filterConfig), ringBufferSize, (ThreadFactory)this.threadFactory, ProducerType.SINGLE, (WaitStrategy)new BlockingWaitStrategy());
            ld.start();
            return ld;
        };
    }

    public Disruptor<RingBufferEvent> getDisruptor() {
        return this.disruptor;
    }

    public List<RingBufferEvent> getHistory(String muxCtx, Predicate<RingBufferEvent> predicate, int nHistory) {
        return this.getHistory(muxCtx, predicate, Long.MAX_VALUE, nHistory);
    }

    public List<RingBufferEvent> getHistory(String muxCtx, Predicate<RingBufferEvent> predicate, long sequence, int nHistory) {
        assert (muxCtx != null && !muxCtx.isBlank());
        assert (sequence >= 0L) : "sequence = " + sequence;
        assert (nHistory > 0) : "nHistory = " + nHistory;
        Disruptor<RingBufferEvent> localDisruptor = this.eventStreams.computeIfAbsent(muxCtx, this.ctxMappingFunction);
        assert (localDisruptor != null) : "disruptor not found for multiplexing context = " + muxCtx;
        RingBuffer ringBuffer = localDisruptor.getRingBuffer();
        long cursor = ringBuffer.getCursor();
        assert (cursor >= 0L) : "uninitialised cursor: " + cursor;
        assert (nHistory < ringBuffer.getBufferSize()) : " nHistory == " + nHistory + " <! " + ringBuffer.getBufferSize();
        ArrayList<RingBufferEvent> history = new ArrayList<RingBufferEvent>(nHistory);
        long seqStart = Math.max(cursor - (long)ringBuffer.getBufferSize() - 1L, 0L);
        for (long seq = cursor; history.size() < nHistory && seqStart <= seq; --seq) {
            RingBufferEvent evt = (RingBufferEvent)ringBuffer.get(seq);
            if (evt.parentSequenceNumber > sequence || !predicate.test(evt)) continue;
            history.add(evt);
        }
        return history;
    }

    public Optional<RingBufferEvent> getLast(String muxCtx, Predicate<RingBufferEvent> predicate) {
        return this.getLast(muxCtx, predicate, Long.MAX_VALUE);
    }

    public Optional<RingBufferEvent> getLast(String muxCtx, Predicate<RingBufferEvent> predicate, long sequence) {
        assert (muxCtx != null && !muxCtx.isBlank());
        Disruptor<RingBufferEvent> localDisruptor = this.eventStreams.computeIfAbsent(muxCtx, this.ctxMappingFunction);
        assert (localDisruptor != null) : "disruptor not found for multiplexing context = " + muxCtx;
        RingBuffer ringBuffer = localDisruptor.getRingBuffer();
        assert (ringBuffer.getCursor() > 0L) : "uninitialised cursor: " + ringBuffer.getCursor();
        long seqStart = Math.max(ringBuffer.getCursor() - (long)ringBuffer.getBufferSize() - 1L, 0L);
        for (long seq = ringBuffer.getCursor(); seqStart <= seq; --seq) {
            RingBufferEvent evt = (RingBufferEvent)ringBuffer.get(seq);
            if (evt.parentSequenceNumber > sequence || !predicate.test(evt)) continue;
            return Optional.of(evt.clone());
        }
        return Optional.empty();
    }

    public RingBuffer<RingBufferEvent> getRingBuffer() {
        return this.disruptor.getRingBuffer();
    }

    @SafeVarargs
    public final LocalEventHandlerGroup register(EventHandler<RingBufferEvent> ... eventHandler) {
        LocalEventHandlerGroup group = new LocalEventHandlerGroup(this.lengthHistoryBuffer, eventHandler);
        this.listener.add(group);
        return group;
    }

    public final LocalEventHandlerGroup register(Predicate<RingBufferEvent> filter, Function<RingBufferEvent, String> muxCtxFunction, HistoryEventHandler ... eventHandler) {
        LocalEventHandlerGroup group = new LocalEventHandlerGroup(this.lengthHistoryBuffer, filter, muxCtxFunction, eventHandler);
        this.listener.add(group);
        return group;
    }

    public void start(boolean startReaper) {
        EventHandler muxCtxWriter = (evt, seq, batch) -> {
            for (Function<RingBufferEvent, String> muxCtxFunc : this.muxCtxFunctions) {
                String muxCtx = muxCtxFunc.apply((RingBufferEvent)evt);
                Disruptor<RingBufferEvent> localDisruptor = this.eventStreams.computeIfAbsent(muxCtx, this.ctxMappingFunction);
                assert (localDisruptor != null) : "disruptor not found for multiplexing context = " + muxCtx;
                if (localDisruptor.getRingBuffer().tryPublishEvent((event, sequence) -> {
                    if (event.payload != null && event.payload.getReferenceCount() > 0) {
                        event.payload.release();
                    }
                    evt.copyTo((RingBufferEvent)event);
                })) continue;
                throw new IllegalStateException("could not write event, sequence = " + seq + " muxCtx = " + muxCtx);
            }
        };
        this.allEventHandlers.add((EventHandler<RingBufferEvent>)muxCtxWriter);
        EventHandlerGroup handlerGroup = this.disruptor.handleEventsWith(new EventHandler[]{muxCtxWriter});
        for (LocalEventHandlerGroup localHandlerGroup : this.listener) {
            this.attachHandler(this.disruptor, (EventHandlerGroup<RingBufferEvent>)handlerGroup, localHandlerGroup);
        }
        assert (handlerGroup != null);
        EventHandler[] eventHanders = this.allEventHandlers.toArray(new EventHandler[0]);
        if (startReaper) {
            this.disruptor.after(eventHanders).then(new EventHandler[]{new RingBufferEvent.ClearEventHandler()});
        }
        for (EventHandler<RingBufferEvent> handler : this.allEventHandlers) {
            if (!(handler instanceof DefaultHistoryEventHandler)) continue;
            ((DefaultHistoryEventHandler)handler).setEventStore(this);
        }
        this.disruptor.start();
    }

    public void start() {
        this.start(true);
    }

    public void stop() {
        this.disruptor.shutdown();
    }

    protected EventHandlerGroup<RingBufferEvent> attachHandler(Disruptor<RingBufferEvent> disruptor, EventHandlerGroup<RingBufferEvent> parentGroup, LocalEventHandlerGroup localHandlerGroup) {
        EventHandler[] eventHanders = localHandlerGroup.handler.toArray(new EventHandler[0]);
        this.allEventHandlers.addAll(localHandlerGroup.handler);
        Object handlerGroup = parentGroup == null ? disruptor.handleEventsWith(eventHanders) : parentGroup.then(eventHanders);
        if (localHandlerGroup.dependent != null && !localHandlerGroup.handler.isEmpty()) {
            handlerGroup = this.attachHandler(disruptor, (EventHandlerGroup<RingBufferEvent>)handlerGroup, localHandlerGroup.dependent);
        }
        return handlerGroup;
    }

    public static EventStoreFactory getFactory() {
        return new EventStoreFactory();
    }

    protected static class DefaultHistoryEventHandler
    implements EventHandler<RingBufferEvent> {
        private final Predicate<RingBufferEvent> filter;
        private final Function<RingBufferEvent, String> muxCtxFunction;
        private final HistoryEventHandler callback;
        private final int lengthHistoryBuffer;
        private EventStore eventStore;
        private Cache<String, LimitedArrayList<RingBufferEvent>> historyCache;

        protected DefaultHistoryEventHandler(EventStore eventStore, Predicate<RingBufferEvent> filter, Function<RingBufferEvent, String> muxCtxFunction, int lengthHistoryBuffer, HistoryEventHandler callback) {
            assert (filter != null) : "filter predicate is null";
            assert (muxCtxFunction != null) : "muxCtxFunction hash function is null";
            assert (callback != null) : "callback function must not be null";
            this.eventStore = eventStore;
            this.filter = filter;
            this.muxCtxFunction = muxCtxFunction;
            this.lengthHistoryBuffer = lengthHistoryBuffer;
            this.callback = callback;
        }

        public void onEvent(RingBufferEvent event, long sequence, boolean endOfBatch) {
            RingBufferEvent result;
            if (!this.filter.test(event)) {
                return;
            }
            String muxCtx = this.muxCtxFunction.apply(event);
            LimitedArrayList history = this.historyCache.computeIfAbsent(muxCtx, ctx -> new LimitedArrayList(this.lengthHistoryBuffer));
            RingBufferEvent eventCopy = event.clone();
            if (history.size() == history.getLimit()) {
                RingBufferEvent removedEvent = (RingBufferEvent)history.remove(history.size() - 1);
                removedEvent.clear();
            }
            history.add(0, eventCopy);
            try {
                result = this.callback.onEvent(history, this.eventStore, sequence, endOfBatch);
            }
            catch (Exception e) {
                LOGGER.atError().setCause((Throwable)e).addArgument((Object)history.size()).addArgument((Object)sequence).addArgument((Object)endOfBatch).log("caught error for arguments (history={}, eventStore, sequence={}, endOfBatch={})");
                event.throwables.add(e);
                return;
            }
            if (result == null) {
                return;
            }
            this.eventStore.getRingBuffer().publishEvent((newEvent, newSequence) -> {
                result.copyTo((RingBufferEvent)newEvent);
                newEvent.parentSequenceNumber = newSequence;
            });
        }

        private void setEventStore(EventStore eventStore) {
            this.eventStore = eventStore;
            BiConsumer<String, LimitedArrayList> clearCacheElement = (muxCtx, history) -> history.forEach(RingBufferEvent::clear);
            Cache<String, Disruptor<RingBufferEvent>> c = eventStore.eventStreams;
            this.historyCache = Cache.builder().withLimit((int)c.getLimit()).withTimeout(c.getTimeout(), c.getTimeUnit()).withPostListener(clearCacheElement).build();
        }
    }

    public static class EventStoreFactory {
        private boolean singleProducer;
        private int maxThreadNumber = 4;
        private int ringbufferSize = 64;
        private int lengthHistoryBuffer = 10;
        private Cache.CacheBuilder<String, Disruptor<RingBufferEvent>> muxBuilder;
        private Function<RingBufferEvent, String> muxCtxFunction;
        private WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(100L, TimeUnit.MILLISECONDS);
        private Class<? extends Filter>[] filterConfig = new Class[0];

        public EventStore build() {
            if (this.muxBuilder == null) {
                this.muxBuilder = Cache.builder().withLimit(this.lengthHistoryBuffer);
            }
            return new EventStore(this.muxBuilder, this.muxCtxFunction, this.ringbufferSize, this.lengthHistoryBuffer, this.maxThreadNumber, this.singleProducer, this.waitStrategy, this.filterConfig);
        }

        public Class<? extends Filter>[] getFilterConfig() {
            return this.filterConfig;
        }

        @SafeVarargs
        public final EventStoreFactory setFilterConfig(Class<? extends Filter> ... filterConfig) {
            if (filterConfig == null) {
                throw new IllegalArgumentException("filterConfig is null");
            }
            this.filterConfig = filterConfig;
            return this;
        }

        public int getLengthHistoryBuffer() {
            return this.lengthHistoryBuffer;
        }

        public EventStoreFactory setLengthHistoryBuffer(int lengthHistoryBuffer) {
            if (lengthHistoryBuffer < 0) {
                throw new IllegalArgumentException("lengthHistoryBuffer < 0: " + lengthHistoryBuffer);
            }
            this.lengthHistoryBuffer = lengthHistoryBuffer;
            return this;
        }

        public int getMaxThreadNumber() {
            return this.maxThreadNumber;
        }

        public EventStoreFactory setMaxThreadNumber(int maxThreadNumber) {
            this.maxThreadNumber = maxThreadNumber;
            return this;
        }

        public Cache.CacheBuilder<String, Disruptor<RingBufferEvent>> getMuxBuilder() {
            return this.muxBuilder;
        }

        public EventStoreFactory setMuxBuilder(Cache.CacheBuilder<String, Disruptor<RingBufferEvent>> muxBuilder) {
            this.muxBuilder = muxBuilder;
            return this;
        }

        public Function<RingBufferEvent, String> getMuxCtxFunction() {
            return this.muxCtxFunction;
        }

        public EventStoreFactory setMuxCtxFunction(Function<RingBufferEvent, String> muxCtxFunction) {
            this.muxCtxFunction = muxCtxFunction;
            return this;
        }

        public int getRingbufferSize() {
            return this.ringbufferSize;
        }

        public EventStoreFactory setRingbufferSize(int ringbufferSize) {
            if (ringbufferSize < 0) {
                throw new IllegalArgumentException("lengthHistoryBuffer < 0: " + ringbufferSize);
            }
            int rounded = Util.ceilingNextPowerOfTwo((int)(ringbufferSize - 1));
            if (ringbufferSize != rounded) {
                LOGGER.atWarn().addArgument((Object)ringbufferSize).addArgument((Object)rounded).log("setRingbufferSize({}) is not a power of two setting to next power of two: {}");
                this.ringbufferSize = rounded;
                return this;
            }
            this.ringbufferSize = ringbufferSize;
            return this;
        }

        public WaitStrategy getWaitStrategy() {
            return this.waitStrategy;
        }

        public EventStoreFactory setWaitStrategy(WaitStrategy waitStrategy) {
            this.waitStrategy = waitStrategy;
            return this;
        }

        public boolean isSingleProducer() {
            return this.singleProducer;
        }

        public EventStoreFactory setSingleProducer(boolean singleProducer) {
            this.singleProducer = singleProducer;
            return this;
        }
    }

    public static class LocalEventHandlerGroup {
        protected final List<EventHandler<RingBufferEvent>> handler = new NoDuplicatesList<EventHandler<RingBufferEvent>>();
        protected final int lengthHistoryBuffer;
        protected LocalEventHandlerGroup dependent;

        @SafeVarargs
        private LocalEventHandlerGroup(int lengthHistoryBuffer, EventHandler<RingBufferEvent> ... eventHandler) {
            assert (eventHandler != null);
            this.lengthHistoryBuffer = lengthHistoryBuffer;
            this.handler.addAll(Arrays.asList(eventHandler));
        }

        private LocalEventHandlerGroup(int lengthHistoryBuffer, Predicate<RingBufferEvent> filter, Function<RingBufferEvent, String> muxCtxFunction, HistoryEventHandler ... eventHandlerCallbacks) {
            assert (eventHandlerCallbacks != null);
            this.lengthHistoryBuffer = lengthHistoryBuffer;
            for (HistoryEventHandler callback : eventHandlerCallbacks) {
                this.handler.add(new DefaultHistoryEventHandler(null, filter, muxCtxFunction, lengthHistoryBuffer, callback));
            }
        }

        @SafeVarargs
        public final LocalEventHandlerGroup and(EventHandler<RingBufferEvent> ... eventHandler) {
            assert (eventHandler != null);
            this.handler.addAll(Arrays.asList(eventHandler));
            return this;
        }

        public final LocalEventHandlerGroup and(Predicate<RingBufferEvent> filter, Function<RingBufferEvent, String> muxCtxFunction, HistoryEventHandler ... eventHandlerCallbacks) {
            assert (eventHandlerCallbacks != null);
            for (HistoryEventHandler callback : eventHandlerCallbacks) {
                this.handler.add(new DefaultHistoryEventHandler(null, filter, muxCtxFunction, this.lengthHistoryBuffer, callback));
            }
            return this;
        }

        @SafeVarargs
        public final LocalEventHandlerGroup then(EventHandler<RingBufferEvent> ... eventHandler) {
            this.dependent = new LocalEventHandlerGroup(this.lengthHistoryBuffer, eventHandler);
            return this.dependent;
        }

        public final LocalEventHandlerGroup then(Predicate<RingBufferEvent> filter, Function<RingBufferEvent, String> muxCtxFunction, HistoryEventHandler ... eventHandlerCallbacks) {
            this.dependent = new LocalEventHandlerGroup(this.lengthHistoryBuffer, filter, muxCtxFunction, eventHandlerCallbacks);
            return this.dependent;
        }
    }
}

