/*
 * Decompiled with CFR 0.152.
 */
package reactor.spring.messaging;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

public class RingBufferBatchingMessageHandler
implements MessageHandler,
InitializingBean {
    private static final AtomicLongFieldUpdater<RingBufferBatchingMessageHandler> SEQ_START = AtomicLongFieldUpdater.newUpdater(RingBufferBatchingMessageHandler.class, "sequenceStart");
    private static final AtomicReferenceFieldUpdater<RingBufferBatchingMessageHandler, MessageHeaders> MSG_HDRS = AtomicReferenceFieldUpdater.newUpdater(RingBufferBatchingMessageHandler.class, MessageHeaders.class, "messageHeaders");
    private final List messagePayloads = new ArrayList();
    private final MessageHandler delegate;
    private final int batchSize;
    private final RingBuffer ringBuffer;
    private volatile MessageHeaders messageHeaders;
    private volatile long sequenceStart = -1L;

    public RingBufferBatchingMessageHandler(MessageHandler delegate, int batchSize) {
        this(delegate, batchSize, ProducerType.MULTI, (WaitStrategy)new BlockingWaitStrategy());
    }

    public RingBufferBatchingMessageHandler(MessageHandler delegate, int batchSize, ProducerType producerType, WaitStrategy waitStrategy) {
        this.delegate = delegate;
        this.batchSize = batchSize;
        this.ringBuffer = RingBuffer.create((ProducerType)producerType, (EventFactory)new EventFactory(){

            public Object newInstance() {
                return new Object();
            }
        }, (int)batchSize, (WaitStrategy)waitStrategy);
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull((Object)this.delegate, (String)"");
    }

    public void handleMessage(Message<?> message) throws MessagingException {
        long seqId = this.ringBuffer.next();
        if (null == MSG_HDRS.get(this) && SEQ_START.compareAndSet(this, -1L, this.sequenceStart)) {
            MSG_HDRS.compareAndSet(this, null, message.getHeaders());
        }
        this.messagePayloads.add(message.getPayload());
        if (seqId % (long)this.batchSize == 0L) {
            GenericMessage msg = new GenericMessage(new ArrayList(this.messagePayloads), (Map)this.messageHeaders);
            this.delegate.handleMessage((Message)msg);
            this.messagePayloads.clear();
            MSG_HDRS.compareAndSet(this, this.messageHeaders, null);
            long start = this.sequenceStart;
            this.ringBuffer.publish(start, seqId);
            SEQ_START.compareAndSet(this, this.sequenceStart, -1L);
        }
    }
}

