package org.asmatron.messengine.engines;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.asmatron.messengine.engines.components.MessageConsumer;
import org.asmatron.messengine.engines.components.ResponseLock;
import org.asmatron.messengine.engines.components.ResponseManager;
import org.asmatron.messengine.messaging.Message;
import org.asmatron.messengine.messaging.MessageListener;

/* loaded from: input_file:org/asmatron/messengine/engines/DefaultMessagingDelegate.class */
public class DefaultMessagingDelegate implements MessagingDelegate, Runnable {
    private static final Log LOG = LogFactory.getLog(DefaultMessagingDelegate.class);
    private final BlockingQueue<Message<?>> queue = new LinkedBlockingQueue();
    private final ExecutorService messageExecutor = Executors.newCachedThreadPool();
    private final ExecutorService engineExecutor = Executors.newSingleThreadExecutor();
    private final Map<String, List<MessageListener<? extends Message<?>>>> listeners = new HashMap();
    private final ResponseManager responseManager = new ResponseManager();
    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);

    @Override // org.asmatron.messengine.engines.BaseDelegate
    @PostConstruct
    public void start() {
        this.responseManager.initialize();
        this.engineExecutor.execute(this);
    }

    @Override // org.asmatron.messengine.engines.BaseDelegate
    @PreDestroy
    public void stop() {
        this.shuttingDown.set(true);
        this.messageExecutor.shutdownNow();
        this.responseManager.shutdown();
        this.engineExecutor.shutdownNow();
        this.shuttingDown.set(false);
        LOG.info("MessEngine has shutdown...");
    }

    @Override // org.asmatron.messengine.engines.MessagingDelegate
    public void send(Message<?> message) {
        if (message.getType() == null) {
            throw new IllegalArgumentException("The Type is invalid.");
        }
        queueMessage(message);
    }

    @Override // org.asmatron.messengine.engines.MessagingDelegate
    public Future<Message<?>> request(Message<?> message, String str, long j) {
        if (message.getType() == null) {
            throw new IllegalArgumentException("The Type is invalid.");
        }
        Future<Message<?>> addResponseListener = this.responseManager.addResponseListener(new ResponseLock(str, j));
        queueMessage(message);
        return addResponseListener;
    }

    private void queueMessage(Message<?> message) {
        this.queue.offer(message);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.engineExecutor.isShutdown()) {
            try {
                Message<?> take = this.queue.take();
                String type = take.getType();
                if (this.listeners.containsKey(type)) {
                    this.messageExecutor.execute(new MessageConsumer(take, this.listeners.get(type)));
                }
                this.responseManager.notifyResponse(take);
            } catch (InterruptedException e) {
                if (this.shuttingDown.get()) {
                    return;
                }
                LOG.warn("MessEngine queue was interrupted.");
                return;
            }
        }
    }

    @Override // org.asmatron.messengine.engines.MessagingDelegate
    public void addMessageListener(String str, MessageListener<? extends Message<?>> messageListener) {
        List<MessageListener<? extends Message<?>>> list = this.listeners.get(str);
        if (list == null) {
            list = new CopyOnWriteArrayList();
            this.listeners.put(str, list);
        }
        list.add(messageListener);
    }

    @Override // org.asmatron.messengine.engines.MessagingDelegate
    public void removeMessageListener(String str, MessageListener<? extends Message<?>> messageListener) {
        if (this.listeners.containsKey(str)) {
            this.listeners.get(str).remove(messageListener);
        }
    }
}
