/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.ra;

import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.transaction.xa.XAResource;
import org.activemq.ActiveMQConnectionConsumer;
import org.activemq.ActiveMQSession;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.ActiveMQTopic;
import org.activemq.ra.ActiveMQActivationSpec;
import org.activemq.ra.ActiveMQBaseEndpointWorker;
import org.activemq.ra.ActiveMQEndpointActivationKey;
import org.activemq.ra.ActiveMQResourceAdapter;
import org.activemq.ra.CircularQueue;
import org.activemq.ra.InboundEndpointWork;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActiveMQPollingEndpointWorker
extends ActiveMQBaseEndpointWorker
implements Work {
    private static final Log log = LogFactory.getLog((Class)ActiveMQPollingEndpointWorker.class);
    private static final int MAX_WORKERS = 10;
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private SynchronizedBoolean stopping = new SynchronizedBoolean(false);
    private Latch stopLatch = new Latch();
    private ActiveMQConnectionConsumer consumer;
    private CircularQueue workers;
    static WorkListener debugingWorkListener = new WorkListener(){

        public void workAccepted(WorkEvent event) {
        }

        public void workRejected(WorkEvent event) {
            log.warn((Object)("Work rejected: " + event), (Throwable)event.getException());
        }

        public void workStarted(WorkEvent event) {
        }

        public void workCompleted(WorkEvent event) {
        }
    };
    private Connection connection;

    public ActiveMQPollingEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
        super(adapter, key);
    }

    public void start() throws WorkException, ResourceException {
        ActiveMQActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();
        boolean ok = false;
        try {
            this.connection = this.adapter.makeConnection(activationSpec);
            this.connection.start();
            this.workers = new CircularQueue(10, this.stopping);
            for (int i = 0; i < this.workers.size(); ++i) {
                int acknowledge = this.transacted ? 0 : activationSpec.getAcknowledgeModeForSession();
                Session session = this.connection.createSession(this.transacted, acknowledge);
                XAResource xaresource = null;
                if (session instanceof XASession) {
                    if (!this.transacted) {
                        throw new ResourceException("You cannot use an XA Connection with a non transacted endpoint.");
                    }
                    xaresource = ((XASession)session).getXAResource();
                }
                MessageEndpoint endpoint = this.endpointFactory.createEndpoint(xaresource);
                this.workers.returnObject(new InboundEndpointWork((ActiveMQSession)session, endpoint, this.workers));
            }
            ActiveMQQueue dest = null;
            if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
                dest = new ActiveMQQueue(activationSpec.getDestination());
            } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
                dest = new ActiveMQTopic(activationSpec.getDestination());
            } else {
                throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
            }
            this.consumer = this.emptyToNull(activationSpec.getSubscriptionName()) != null ? (ActiveMQConnectionConsumer)this.connection.createDurableConnectionConsumer((Topic)dest, activationSpec.getSubscriptionName(), this.emptyToNull(activationSpec.getMessageSelector()), null, 0) : (ActiveMQConnectionConsumer)this.connection.createConnectionConsumer((Destination)dest, this.emptyToNull(activationSpec.getMessageSelector()), null, 0);
            ok = true;
            log.debug((Object)"Started");
            this.workManager.scheduleWork((Work)this, Long.MAX_VALUE, null, debugingWorkListener);
            ok = true;
        }
        catch (JMSException e) {
            throw new ResourceException("Could not start the endpoint.", (Throwable)e);
        }
        finally {
            if (!ok) {
                ActiveMQPollingEndpointWorker.safeClose((ConnectionConsumer)this.consumer);
                ActiveMQPollingEndpointWorker.safeClose(this.connection);
            }
        }
    }

    private String emptyToNull(String value) {
        if ("".equals(value)) {
            return null;
        }
        return value;
    }

    public void stop() throws InterruptedException {
        this.stopping.set(true);
        this.workers.notifyWaiting();
        if (this.started.compareTo(true) == 0) {
            this.stopLatch.acquire();
        }
        ActiveMQPollingEndpointWorker.safeClose((ConnectionConsumer)this.consumer);
        ActiveMQPollingEndpointWorker.safeClose(this.connection);
    }

    public void release() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        this.started.set(true);
        try {
            while (!this.stopping.get()) {
                ActiveMQMessage message = this.consumer.receive(500L);
                if (message == null) continue;
                InboundEndpointWork worker = (InboundEndpointWork)this.workers.get();
                if (worker == null) break;
                worker.setMessage(message);
                this.workManager.scheduleWork((Work)worker, Long.MAX_VALUE, null, debugingWorkListener);
            }
            this.workers.drain();
        }
        catch (Throwable e) {
            log.info((Object)"dispatcher: ", e);
        }
        finally {
            this.stopLatch.release();
        }
    }
}

