/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceReportingEventHandler;
import com.lmax.disruptor.TimeoutHandler;
import io.opencmw.OpenCmwProtocol;
import io.opencmw.RingBufferEvent;
import io.opencmw.filter.EvtTypeFilter;
import io.opencmw.filter.TimingCtx;
import io.opencmw.utils.Cache;
import io.opencmw.utils.NoDuplicatesList;
import io.opencmw.utils.SharedPointer;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregateEventHandler
implements SequenceReportingEventHandler<RingBufferEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AggregateEventHandler.class);
    protected final Map<Long, Object> aggregatedBpcts;
    private final RingBuffer<RingBufferEvent> ringBuffer;
    private final long timeOut;
    private final TimeUnit timeOutUnit;
    private final int numberOfEventsToAggregate;
    private final List<URI> deviceList;
    private final List<Predicate<RingBufferEvent>> evtTypeFilter;
    private final InternalAggregationHandler[] aggregationHandler;
    private final List<InternalAggregationHandler> freeWorkers;
    private final URI aggregateName;
    private Sequence seq;

    private AggregateEventHandler(RingBuffer<RingBufferEvent> ringBuffer, URI aggregateName, long timeOut, TimeUnit timeOutUnit, int nWorkers, int retentionSize, List<URI> deviceList, List<Predicate<RingBufferEvent>> evtTypeFilter) {
        this.ringBuffer = ringBuffer;
        this.aggregateName = aggregateName;
        this.timeOut = timeOut;
        this.timeOutUnit = timeOutUnit;
        this.freeWorkers = Collections.synchronizedList(new ArrayList(nWorkers));
        this.aggregationHandler = new InternalAggregationHandler[nWorkers];
        for (int i = 0; i < nWorkers; ++i) {
            this.aggregationHandler[i] = new InternalAggregationHandler();
            this.freeWorkers.add(this.aggregationHandler[i]);
        }
        this.aggregatedBpcts = new Cache<Long, Object>(retentionSize);
        this.deviceList = deviceList;
        this.evtTypeFilter = evtTypeFilter;
        this.numberOfEventsToAggregate = deviceList.size() + evtTypeFilter.size();
    }

    public InternalAggregationHandler[] getAggregationHander() {
        return this.aggregationHandler;
    }

    public void onEvent(RingBufferEvent event, long nextSequence, boolean b) {
        TimingCtx ctx = event.getFilter(TimingCtx.class);
        if (ctx == null) {
            return;
        }
        boolean alreadyScheduled = this.aggregatedBpcts.containsKey(ctx.bpcts);
        if (alreadyScheduled) {
            return;
        }
        while (true) {
            if (!this.freeWorkers.isEmpty()) {
                InternalAggregationHandler freeWorker = this.freeWorkers.remove(0);
                freeWorker.bpcts = ctx.bpcts;
                freeWorker.aggStart = event.arrivalTimeStamp;
                this.aggregatedBpcts.put(ctx.bpcts, new Object());
                this.seq.set(nextSequence);
                return;
            }
            long waitTimeNanos = Long.MAX_VALUE;
            for (InternalAggregationHandler w : this.aggregationHandler) {
                long currentTime = System.currentTimeMillis();
                long diffMillis = currentTime - w.aggStart;
                waitTimeNanos = Math.min(waitTimeNanos, TimeUnit.MILLISECONDS.toNanos(diffMillis));
                if (w.bpcts == -1L || diffMillis >= this.timeOutUnit.toMillis(this.timeOut)) continue;
                w.publishAndFreeWorker(OpenCmwProtocol.Command.PARTIAL);
                break;
            }
            LockSupport.parkNanos(waitTimeNanos);
        }
    }

    public void setSequenceCallback(Sequence sequence) {
        this.seq = sequence;
    }

    public static AggregateEventHandlerFactory getFactory() {
        return new AggregateEventHandlerFactory();
    }

    protected class InternalAggregationHandler
    implements EventHandler<RingBufferEvent>,
    TimeoutHandler {
        protected volatile long bpcts = -1L;
        protected volatile long aggStart = -1L;
        protected List<RingBufferEvent> aggregatedEventsStash = new ArrayList<RingBufferEvent>();

        protected InternalAggregationHandler() {
        }

        public void onEvent(RingBufferEvent event, long sequence, boolean endOfBatch) {
            if (this.bpcts != -1L && event.arrivalTimeStamp > this.aggStart + AggregateEventHandler.this.timeOutUnit.toMillis(AggregateEventHandler.this.timeOut)) {
                this.publishAndFreeWorker(OpenCmwProtocol.Command.PARTIAL);
                return;
            }
            TimingCtx ctx = event.getFilter(TimingCtx.class);
            if (this.bpcts == -1L || ctx == null || ctx.bpcts != this.bpcts) {
                return;
            }
            EvtTypeFilter evtType = event.getFilter(EvtTypeFilter.class);
            if (evtType == null) {
                throw new IllegalArgumentException("cannot aggregate events without ring buffer containing EvtTypeFilter");
            }
            if (!AggregateEventHandler.this.deviceList.isEmpty() && !AggregateEventHandler.this.deviceList.contains(evtType.property) || !AggregateEventHandler.this.evtTypeFilter.isEmpty() && AggregateEventHandler.this.evtTypeFilter.stream().noneMatch(filter -> filter.test(event))) {
                return;
            }
            this.aggregatedEventsStash.add(event);
            if (this.aggregatedEventsStash.size() == AggregateEventHandler.this.numberOfEventsToAggregate) {
                this.publishAndFreeWorker(OpenCmwProtocol.Command.FINAL);
            }
        }

        public void onTimeout(long sequence) {
            if (this.bpcts != -1L && System.currentTimeMillis() > this.aggStart + AggregateEventHandler.this.timeOut) {
                this.publishAndFreeWorker(OpenCmwProtocol.Command.PARTIAL);
            }
        }

        protected void publishAndFreeWorker(OpenCmwProtocol.Command updateType) {
            long now = System.currentTimeMillis();
            AggregateEventHandler.this.ringBuffer.publishEvent((event, sequence, arg0) -> {
                TimingCtx ctx = event.getFilter(TimingCtx.class);
                if (ctx == null) {
                    throw new IllegalStateException("RingBufferEvent has not TimingCtx definition");
                }
                EvtTypeFilter evtType = event.getFilter(EvtTypeFilter.class);
                if (evtType == null) {
                    throw new IllegalArgumentException("cannot aggregate events without ring buffer containing EvtTypeFilter");
                }
                event.arrivalTimeStamp = now;
                event.payload = new SharedPointer();
                HashMap<URI, SharedPointer<Object>> aggregatedItems = new HashMap<URI, SharedPointer<Object>>();
                event.payload.set(aggregatedItems);
                if (updateType == OpenCmwProtocol.Command.PARTIAL) {
                    LOGGER.atInfo().log("aggregation timed out for bpcts: " + this.bpcts);
                }
                if (this.aggregatedEventsStash.isEmpty()) {
                    event.parentSequenceNumber = sequence;
                    evtType.property = AggregateEventHandler.this.aggregateName;
                    evtType.updateType = OpenCmwProtocol.Command.UNKNOWN;
                    evtType.evtType = EvtTypeFilter.DataType.AGGREGATE_DATA;
                    return;
                }
                RingBufferEvent firstItem = this.aggregatedEventsStash.get(0);
                for (int i = 0; i < firstItem.filters.length; ++i) {
                    firstItem.filters[i].copyTo(event.filters[i]);
                }
                if (updateType == OpenCmwProtocol.Command.PARTIAL) {
                    LOGGER.atInfo().log("aggregation timed out for 2:bpcts: " + event.getFilter(TimingCtx.class).bpcts);
                }
                evtType.property = AggregateEventHandler.this.aggregateName;
                evtType.updateType = updateType;
                evtType.evtType = EvtTypeFilter.DataType.AGGREGATE_DATA;
                event.parentSequenceNumber = sequence;
                for (RingBufferEvent rbEvent : this.aggregatedEventsStash) {
                    EvtTypeFilter type = rbEvent.getFilter(EvtTypeFilter.class);
                    aggregatedItems.put(type.property, rbEvent.payload.getCopy());
                }
            }, this.aggregatedEventsStash);
            this.bpcts = -1L;
            this.aggregatedEventsStash = new ArrayList<RingBufferEvent>();
            AggregateEventHandler.this.freeWorkers.add(this);
        }
    }

    public static class AggregateEventHandlerFactory {
        private final List<URI> deviceList = new NoDuplicatesList<URI>();
        private final List<Predicate<RingBufferEvent>> evtTypeFilter = new NoDuplicatesList<Predicate<RingBufferEvent>>();
        private RingBuffer<RingBufferEvent> ringBuffer;
        private int numberWorkers = 4;
        private long timeOut = 400L;
        private TimeUnit timeOutUnit = TimeUnit.MILLISECONDS;
        private int retentionSize = 12;
        private URI aggregateName;

        public AggregateEventHandler build() {
            if (this.aggregateName == null || this.aggregateName.toString().isBlank()) {
                throw new IllegalArgumentException("aggregateName must not be null or blank");
            }
            if (this.ringBuffer == null) {
                throw new IllegalArgumentException("ringBuffer must not be null");
            }
            int actualRetentionSize = Math.min(this.retentionSize, 3 * this.numberWorkers);
            return new AggregateEventHandler(this.ringBuffer, this.aggregateName, this.timeOut, this.timeOutUnit, this.numberWorkers, actualRetentionSize, this.deviceList, this.evtTypeFilter);
        }

        public URI getAggregateName() {
            return this.aggregateName;
        }

        public AggregateEventHandlerFactory setAggregateName(URI aggregateName) {
            this.aggregateName = aggregateName;
            return this;
        }

        public List<URI> getDeviceList() {
            return this.deviceList;
        }

        public AggregateEventHandlerFactory setDeviceList(List<URI> deviceList) {
            this.deviceList.addAll(deviceList);
            return this;
        }

        public AggregateEventHandlerFactory setDeviceList(URI ... devices) {
            this.deviceList.addAll(Arrays.asList(devices));
            return this;
        }

        public List<Predicate<RingBufferEvent>> getEvtTypeFilter() {
            return this.evtTypeFilter;
        }

        @SafeVarargs
        public final AggregateEventHandlerFactory setEvtTypeFilter(Predicate<RingBufferEvent> ... evtTypeFilter) {
            this.evtTypeFilter.addAll(Arrays.asList(evtTypeFilter));
            return this;
        }

        public AggregateEventHandlerFactory setEvtTypeFilter(List<Predicate<RingBufferEvent>> evtTypeFilter) {
            this.evtTypeFilter.addAll(evtTypeFilter);
            return this;
        }

        public int getNumberWorkers() {
            return this.numberWorkers;
        }

        public AggregateEventHandlerFactory setNumberWorkers(int numberWorkers) {
            if (numberWorkers < 1) {
                throw new IllegalArgumentException("numberWorkers must not be < 1: " + numberWorkers);
            }
            this.numberWorkers = numberWorkers;
            return this;
        }

        public int getRetentionSize() {
            return this.retentionSize;
        }

        public AggregateEventHandlerFactory setRetentionSize(int retentionSize) {
            if (retentionSize < 1) {
                throw new IllegalArgumentException("timeOut must not be < 1: " + retentionSize);
            }
            this.retentionSize = retentionSize;
            return this;
        }

        public RingBuffer<RingBufferEvent> getRingBuffer() {
            return this.ringBuffer;
        }

        public AggregateEventHandlerFactory setRingBuffer(RingBuffer<RingBufferEvent> ringBuffer) {
            if (ringBuffer == null) {
                throw new IllegalArgumentException("ringBuffer must not null");
            }
            this.ringBuffer = ringBuffer;
            return this;
        }

        public long getTimeOut() {
            return this.timeOut;
        }

        public TimeUnit getTimeOutUnit() {
            return this.timeOutUnit;
        }

        public AggregateEventHandlerFactory setTimeOut(long timeOut, TimeUnit timeOutUnit) {
            if (timeOut <= 0L) {
                throw new IllegalArgumentException("timeOut must not be <=0: " + timeOut);
            }
            if (timeOutUnit == null) {
                throw new IllegalArgumentException("timeOutUnit must not null");
            }
            this.timeOut = timeOut;
            this.timeOutUnit = timeOutUnit;
            return this;
        }
    }
}

