/*
 * Decompiled with CFR 0.152.
 */
package io.opencmw.concepts.aggregate;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceReportingEventHandler;
import com.lmax.disruptor.TimeoutHandler;
import io.opencmw.concepts.aggregate.TestEventSource;
import io.opencmw.utils.Cache;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.LockSupport;

public class DemuxEventDispatcher
implements SequenceReportingEventHandler<TestEventSource.IngestedEvent> {
    private static final int N_WORKERS = 4;
    private static final long TIMEOUT = 400L;
    private static final int RETENTION_SIZE = 10;
    private static final int N_AGG_ELEMENTS = 3;
    private final AggregationHandler[] aggregationHandler;
    private final List<AggregationHandler> freeWorkers = Collections.synchronizedList(new ArrayList(4));
    private final RingBuffer<TestEventSource.IngestedEvent> rb;
    private final Cache<Long, Object> aggregatedBpcts = new Cache(10);
    private Sequence seq;

    public DemuxEventDispatcher(RingBuffer<TestEventSource.IngestedEvent> ringBuffer) {
        this.rb = ringBuffer;
        this.aggregationHandler = new AggregationHandler[4];
        for (int i = 0; i < 4; ++i) {
            this.aggregationHandler[i] = new AggregationHandler();
            this.freeWorkers.add(this.aggregationHandler[i]);
        }
    }

    public AggregationHandler[] getAggregationHander() {
        return this.aggregationHandler;
    }

    public void onEvent(TestEventSource.IngestedEvent event, long nextSequence, boolean b) {
        if (!(event.payload instanceof TestEventSource.Event)) {
            return;
        }
        long eventBpcts = ((TestEventSource.Event)event.payload).bpcts;
        boolean alreadyScheduled = this.aggregatedBpcts.containsKey((Object)eventBpcts);
        if (alreadyScheduled) {
            return;
        }
        while (true) {
            if (!this.freeWorkers.isEmpty()) {
                AggregationHandler freeWorker = this.freeWorkers.remove(0);
                freeWorker.bpcts = eventBpcts;
                freeWorker.aggStart = event.ingestionTime;
                this.aggregatedBpcts.put((Object)eventBpcts, new Object());
                this.seq.set(nextSequence);
                return;
            }
            long waitTime = Long.MAX_VALUE;
            for (AggregationHandler w : this.aggregationHandler) {
                long currentTime = System.currentTimeMillis();
                long diff = currentTime - w.aggStart;
                waitTime = Math.min(waitTime, diff * 1000000L);
                if (w.bpcts == -1L || diff >= 400L) continue;
                w.publishAndFreeWorker(true);
                break;
            }
            LockSupport.parkNanos(waitTime);
        }
    }

    public void setSequenceCallback(Sequence sequence) {
        this.seq = sequence;
    }

    public class AggregationHandler
    implements EventHandler<TestEventSource.IngestedEvent>,
    TimeoutHandler {
        protected volatile long bpcts = -1L;
        protected volatile long aggStart = -1L;
        private List<TestEventSource.IngestedEvent> payloads = new ArrayList<TestEventSource.IngestedEvent>();

        public void onEvent(TestEventSource.IngestedEvent event, long sequence, boolean endOfBatch) {
            if (this.bpcts != -1L && event.ingestionTime > this.aggStart + 400L) {
                this.publishAndFreeWorker(true);
                return;
            }
            if (this.bpcts == -1L || !(event.payload instanceof TestEventSource.Event) || ((TestEventSource.Event)event.payload).bpcts != this.bpcts) {
                return;
            }
            this.payloads.add(event);
            if (this.payloads.size() == 3) {
                this.publishAndFreeWorker(false);
            }
        }

        protected void publishAndFreeWorker(boolean partial) {
            DemuxEventDispatcher.this.rb.publishEvent((event1, sequence1, arg0) -> {
                event1.ingestionTime = System.currentTimeMillis();
                event1.payload = partial ? "aggregation timed out for bpcts: " + this.bpcts + " -> " + this.payloads : this.payloads;
            }, this.payloads);
            this.bpcts = -1L;
            this.payloads = new ArrayList<TestEventSource.IngestedEvent>();
            DemuxEventDispatcher.this.freeWorkers.add(this);
        }

        public void onTimeout(long sequence) {
            if (this.bpcts != -1L && System.currentTimeMillis() > this.aggStart + 400L) {
                this.publishAndFreeWorker(true);
            }
        }
    }
}

