/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.ra;

import java.lang.reflect.Method;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import org.activemq.ActiveMQSession;
import org.activemq.ra.ServerSessionPoolImpl;
import org.activemq.ra.SessionAndProducer;
import org.activemq.ra.SessionAndProducerHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ServerSessionImpl
implements ServerSession,
SessionAndProducer,
Work,
ActiveMQSession.DeliveryListener {
    public static final Method ON_MESSAGE_METHOD;
    private static int nextLogId;
    private int serverSessionId = ServerSessionImpl.getNextLogId();
    private final Log log = LogFactory.getLog((String)(ServerSessionImpl.class.getName() + ":" + this.serverSessionId));
    private ActiveMQSession session;
    private WorkManager workManager;
    private MessageEndpoint endpoint;
    private MessageProducer messageProducer;
    private final ServerSessionPoolImpl pool;
    private Object runControlMutex = new Object();
    private boolean runningFlag = false;
    private boolean stale;
    private final boolean useRAManagedTx;
    private final int batchSize;
    private int currentBatchSize;

    private static synchronized int getNextLogId() {
        return nextLogId++;
    }

    public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
        this.pool = pool;
        this.session = session;
        this.workManager = workManager;
        this.endpoint = endpoint;
        this.useRAManagedTx = useRAManagedTx;
        this.session.setMessageListener((MessageListener)endpoint);
        this.session.setDeliveryListener((ActiveMQSession.DeliveryListener)this);
        this.batchSize = batchSize;
    }

    public Session getSession() throws JMSException {
        return this.session;
    }

    public MessageProducer getMessageProducer() throws JMSException {
        if (this.messageProducer == null) {
            this.messageProducer = this.getSession().createProducer(null);
        }
        return this.messageProducer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws JMSException {
        Object object = this.runControlMutex;
        synchronized (object) {
            if (this.runningFlag) {
                this.log.debug((Object)"Start request ignored, allready running.");
                return;
            }
            this.runningFlag = true;
        }
        this.log.debug((Object)"Starting run.");
        try {
            this.workManager.scheduleWork((Work)this, Long.MAX_VALUE, null, new WorkListener(){

                public void workAccepted(WorkEvent event) {
                    ServerSessionImpl.this.log.debug((Object)("Work accepted: " + event));
                }

                public void workRejected(WorkEvent event) {
                    ServerSessionImpl.this.log.debug((Object)("Work rejected: " + event));
                }

                public void workStarted(WorkEvent event) {
                    ServerSessionImpl.this.log.debug((Object)("Work started: " + event));
                }

                public void workCompleted(WorkEvent event) {
                    ServerSessionImpl.this.log.debug((Object)("Work completed: " + event));
                }
            });
        }
        catch (WorkException e) {
            throw (JMSException)new JMSException("Start failed: " + (Object)((Object)e)).initCause((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void run() {
        this.log.debug((Object)"Running");
        while (true) {
            this.log.debug((Object)"run loop start");
            try {
                SessionAndProducerHelper.register(this);
                this.currentBatchSize = 0;
                this.session.run();
                continue;
            }
            catch (Throwable e) {
                this.stale = true;
                this.log.debug((Object)"Endpoint failed to process message.", e);
                this.log.info((Object)("Endpoint failed to process message. Reason: " + e));
                continue;
            }
            finally {
                SessionAndProducerHelper.unregister(this);
                this.log.debug((Object)"run loop end");
                Object object = this.runControlMutex;
                synchronized (object) {
                    if (this.stale) {
                        this.runningFlag = false;
                        this.pool.removeFromPool(this);
                        break;
                    }
                    if (!this.session.hasUncomsumedMessages()) {
                        this.runningFlag = false;
                        this.pool.returnToPool(this);
                        break;
                    }
                }
                continue;
            }
            break;
        }
        this.log.debug((Object)"Run finished");
    }

    public void beforeDelivery(ActiveMQSession session, Message msg) {
        if (this.currentBatchSize == 0) {
            try {
                this.endpoint.beforeDelivery(ON_MESSAGE_METHOD);
            }
            catch (Throwable e) {
                throw new RuntimeException("Endpoint before delivery notification failure", e);
            }
        }
    }

    public void afterDelivery(ActiveMQSession session, Message msg) {
        if (++this.currentBatchSize >= this.batchSize || !session.hasUncomsumedMessages()) {
            this.currentBatchSize = 0;
            try {
                this.endpoint.afterDelivery();
            }
            catch (Throwable e) {
                throw new RuntimeException("Endpoint after delivery notification failure", e);
            }
            finally {
                if (session.getTransactionContext().isInLocalTransaction()) {
                    if (!this.useRAManagedTx) {
                        this.log.warn((Object)"Local transaction had not been commited.  Commiting now.");
                    }
                    try {
                        session.commit();
                    }
                    catch (JMSException e) {
                        this.log.info((Object)"Commit failed:", (Throwable)e);
                    }
                }
            }
        }
    }

    public void release() {
        this.log.debug((Object)"release called");
    }

    public String toString() {
        return "ServerSessionImpl:" + this.serverSessionId;
    }

    public void close() {
        try {
            this.endpoint.release();
        }
        catch (Throwable e) {
            this.log.debug((Object)("Endpoint did not release properly: " + e), e);
        }
        try {
            this.session.close();
        }
        catch (Throwable e) {
            this.log.debug((Object)("Session did not close properly: " + e), e);
        }
    }

    static {
        try {
            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", Message.class);
        }
        catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
        nextLogId = 0;
    }
}

