package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
import org.axonframework.commandhandling.disruptor.CommandHandlingEntry;
import org.axonframework.common.Assert;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventstore.EventStore;
import org.axonframework.repository.AggregateNotFoundException;
import org.axonframework.repository.ConflictingAggregateVersionException;
import org.axonframework.repository.Repository;
import org.axonframework.unitofwork.CurrentUnitOfWork;
import org.axonframework.unitofwork.SaveAggregateCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus.class */
public class DisruptorCommandBus<T extends EventSourcedAggregateRoot> implements CommandBus, Repository<T> {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorCommandBus.class);
    private static final ThreadGroup DISRUPTOR_THREAD_GROUP = new ThreadGroup("DisruptorCommandBus");
    private final ConcurrentMap<Class<?>, CommandHandler<?>> commandHandlers;
    private final Disruptor<CommandHandlingEntry<T>> disruptor;
    private final CommandHandlerInvoker<T> commandHandlerInvoker;
    private final ExecutorService executorService;
    private final boolean rescheduleOnCorruptState;
    private volatile boolean started;
    private volatile boolean disruptorShutDown;
    private final long coolingDownPeriod;

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus$AxonThreadFactory.class */
    private static class AxonThreadFactory implements ThreadFactory {
        private final ThreadGroup groupName;

        public AxonThreadFactory(ThreadGroup threadGroup) {
            this.groupName = threadGroup;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(this.groupName, runnable);
        }
    }

    public DisruptorCommandBus(AggregateFactory<T> aggregateFactory, EventStore eventStore, EventBus eventBus, CommandTargetResolver commandTargetResolver) {
        this(aggregateFactory, eventStore, eventBus, commandTargetResolver, new DisruptorConfiguration());
    }

    public DisruptorCommandBus(AggregateFactory<T> aggregateFactory, EventStore eventStore, EventBus eventBus, CommandTargetResolver commandTargetResolver, DisruptorConfiguration disruptorConfiguration) {
        this.commandHandlers = new ConcurrentHashMap();
        this.started = true;
        this.disruptorShutDown = false;
        Executor executor = disruptorConfiguration.getExecutor();
        if (executor == null) {
            this.executorService = Executors.newCachedThreadPool(new AxonThreadFactory(DISRUPTOR_THREAD_GROUP));
            executor = this.executorService;
        } else {
            this.executorService = null;
        }
        this.rescheduleOnCorruptState = disruptorConfiguration.getRescheduleCommandsOnCorruptState();
        this.disruptor = new Disruptor<>(new CommandHandlingEntry.Factory(), executor, disruptorConfiguration.getClaimStrategy(), disruptorConfiguration.getWaitStrategy());
        this.commandHandlerInvoker = new CommandHandlerInvoker<>();
        this.disruptor.handleExceptionsWith(new ExceptionHandler() { // from class: org.axonframework.commandhandling.disruptor.DisruptorCommandBus.1
            public void handleEventException(Throwable th, long j, Object obj) {
                DisruptorCommandBus.logger.error("Exception occurred while processing a {}.", ((CommandHandlingEntry) obj).getCommand().getPayloadType().getSimpleName(), th);
            }

            public void handleOnStartException(Throwable th) {
                DisruptorCommandBus.logger.error("Failed to start the DisruptorCommandBus.", th);
                DisruptorCommandBus.this.disruptor.shutdown();
            }

            public void handleOnShutdownException(Throwable th) {
                DisruptorCommandBus.logger.error("Error while shutting down the DisruptorCommandBus", th);
            }
        });
        this.disruptor.handleEventsWith(new EventHandler[]{new CommandHandlerPreFetcher(eventStore, aggregateFactory, this.commandHandlers, disruptorConfiguration.getInvokerInterceptors(), disruptorConfiguration.getPublisherInterceptors(), commandTargetResolver, disruptorConfiguration.getCache())}).then(new EventHandler[]{this.commandHandlerInvoker}).then(new EventHandler[]{new EventPublisher(aggregateFactory.getTypeIdentifier(), eventStore, eventBus, executor, disruptorConfiguration.getRollbackConfiguration())});
        this.coolingDownPeriod = disruptorConfiguration.getCoolingDownPeriod();
        this.disruptor.start();
    }

    public void dispatch(CommandMessage<?> commandMessage) {
        dispatch(commandMessage, null);
    }

    public <R> void dispatch(CommandMessage<?> commandMessage, CommandCallback<R> commandCallback) {
        Assert.state(this.started, "CommandBus has been shut down. It is not accepting any Commands");
        doDispatch(commandMessage, commandCallback);
    }

    public <R> void doDispatch(CommandMessage commandMessage, CommandCallback<R> commandCallback) {
        Assert.state(!this.disruptorShutDown, "Disruptor has been shut down. Cannot dispatch or redispatch commands");
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        long next = ringBuffer.next();
        ((CommandHandlingEntry) ringBuffer.get(next)).reset(commandMessage, new BlacklistDetectingCallback(commandCallback, commandMessage, this.disruptor.getRingBuffer(), this, this.rescheduleOnCorruptState));
        ringBuffer.publish(next);
    }

    public <C> void subscribe(Class<C> cls, CommandHandler<? super C> commandHandler) {
        this.commandHandlers.put(cls, commandHandler);
    }

    public <C> boolean unsubscribe(Class<C> cls, CommandHandler<? super C> commandHandler) {
        return this.commandHandlers.remove(cls, commandHandler);
    }

    /* renamed from: load, reason: merged with bridge method [inline-methods] */
    public T m5load(Object obj, Long l) {
        T m4load = m4load(obj);
        if (l == null || m4load.getVersion().longValue() <= l.longValue()) {
            return m4load;
        }
        throw new ConflictingAggregateVersionException(obj, l.longValue(), m4load.getVersion().longValue());
    }

    /* renamed from: load, reason: merged with bridge method [inline-methods] */
    public T m4load(Object obj) {
        T preLoadedAggregate = this.commandHandlerInvoker.getPreLoadedAggregate();
        if (preLoadedAggregate == null) {
            throw new AggregateNotFoundException(obj, String.format("No aggregate with identifier [%s] was found pre-load phase.", obj.toString()));
        }
        if (preLoadedAggregate.getIdentifier().equals(obj)) {
            return preLoadedAggregate;
        }
        throw new UnsupportedOperationException("Not supported to load another aggregate than the pre-loaded one");
    }

    public void add(T t) {
        CurrentUnitOfWork.get().registerAggregate(t, (EventBus) null, (SaveAggregateCallback) null);
    }

    public void stop() {
        if (this.started) {
            this.started = false;
            long currentTimeMillis = System.currentTimeMillis();
            long cursor = this.disruptor.getRingBuffer().getCursor();
            while (System.currentTimeMillis() - currentTimeMillis < this.coolingDownPeriod && !Thread.interrupted()) {
                if (this.disruptor.getRingBuffer().getCursor() != cursor) {
                    currentTimeMillis = System.currentTimeMillis();
                    cursor = this.disruptor.getRingBuffer().getCursor();
                }
            }
            this.disruptorShutDown = true;
            this.disruptor.shutdown();
            if (this.executorService != null) {
                this.executorService.shutdown();
            }
        }
    }
}
