/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.wsmg.broker;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor;
import org.apache.airavata.wsmg.broker.context.ContextParameters;
import org.apache.airavata.wsmg.broker.context.ProcessingContext;
import org.apache.airavata.wsmg.commons.NameSpaceConstants;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
import org.apache.airavata.wsmg.messenger.OutGoingQueue;
import org.apache.airavata.wsmg.util.BrokerUtil;
import org.apache.airavata.wsmg.util.RunTimeStatistics;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMException;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMNamespace;
import org.apache.axis2.AxisFault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationProcessor {
    private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
    private WsmgConfigurationContext wsmgConfigContext;
    protected long messageCounter = 0L;
    protected long messageId = 0L;
    OMFactory factory = OMAbstractFactory.getOMFactory();
    private OutGoingQueue outgoingQueue;
    private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor();

    public NotificationProcessor(WsmgConfigurationContext config) {
        this.init(config);
        this.amqpNotificationProcessor.init();
    }

    private void init(WsmgConfigurationContext config) {
        this.wsmgConfigContext = config;
        this.outgoingQueue = config.getOutgoingQueue();
    }

    private synchronized long getNextTrackId() {
        ++this.messageCounter;
        return this.messageCounter;
    }

    private synchronized long getNextMsgId() {
        ++this.messageId;
        return this.messageId;
    }

    public void processMsg(ProcessingContext ctx, OMNamespace protocolNs) throws OMException, AxisFault {
        String trackId = "trackId_A_" + this.getNextTrackId();
        AdditionalMessageContent additionalMessageContent = new AdditionalMessageContent(ctx.getMessageContext().getSoapAction(), ctx.getMessageContext().getMessageID());
        additionalMessageContent.setTrackId(trackId);
        this.handleExtendedNotifications(ctx, protocolNs);
        if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
            this.onWSNTMsg(ctx, additionalMessageContent);
            this.setResponseMsg(ctx, trackId, protocolNs);
        } else {
            this.onWSEMsg(ctx, trackId, additionalMessageContent);
            this.setResponseMsg(ctx, trackId, protocolNs);
        }
    }

    private void onWSEMsg(ProcessingContext ctx, String trackId, AdditionalMessageContent additionalMessageContent) throws OMException, AxisFault {
        String topicElString = null;
        String topicLocalString = null;
        QName qName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Topic");
        OMElement topicEl = ctx.getMessageContext().getEnvelope().getHeader().getFirstChildWithName(qName);
        if (topicEl == null) {
            topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
            if (topicLocalString != null) {
                topicElString = "<wsnt:Topic Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" xmlns:ns2=\"http://tutorial.globus.org/auction\" xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">ns2:" + topicLocalString + "</wsnt:Topic>";
                additionalMessageContent.setTopicElement(topicElString);
            } else {
                topicLocalString = "wseTopic";
                topicElString = "<wsnt:Topic Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" xmlns:ns2=\"http://tutorial.globus.org/auction\" xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">ns2:wseTopic</wsnt:Topic>";
                additionalMessageContent.setTopicElement(topicElString);
            }
        } else {
            topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
            try {
                topicElString = topicEl.toStringWithConsume();
            }
            catch (XMLStreamException e) {
                logger.error("exceptions occured at WSE eventing notification creating", (Throwable)e);
            }
            additionalMessageContent.setTopicElement(topicElString);
        }
        OMElement messageEl = ctx.getSoapBody().getFirstElement();
        if (messageEl == null) {
            throw new AxisFault("no message found");
        }
        String message = null;
        try {
            message = messageEl.toStringWithConsume();
        }
        catch (XMLStreamException e) {
            logger.error("unable to serialize the message", (Throwable)e);
            throw new AxisFault("unable to serialize the message", (Throwable)e);
        }
        this.matchAndSave(message, topicLocalString, additionalMessageContent);
    }

    private void setResponseMsg(ProcessingContext ctx, String trackId, OMNamespace responseNS) throws OMException {
        ctx.addResponseMsgNameSpaces(responseNS);
        OMAttribute trackIdAttribute = this.factory.createOMAttribute("trackId", null, trackId);
        OMElement messageElement = ctx.getMessageContext().getEnvelope().getBody().getFirstElement();
        OMElement responseMsgElement = this.factory.createOMElement(messageElement.getLocalName() + "Response", responseNS);
        responseMsgElement.addAttribute(trackIdAttribute);
        ctx.setRespMessage(responseMsgElement);
    }

    private void onWSNTMsg(ProcessingContext ctx, AdditionalMessageContent additionalMessageContent) throws OMException, AxisFault {
        String producerReferenceElString = null;
        String topicElString = null;
        boolean noElements = true;
        OMElement notifyEl = ctx.getSoapBody().getFirstElement();
        Iterator iter = notifyEl.getChildrenWithLocalName("NotificationMessage");
        while (iter.hasNext()) {
            OMElement producerReferenceEl;
            noElements = false;
            OMElement wrappedMessageEl = (OMElement)iter.next();
            String topicLocalString = null;
            OMElement topicEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Topic"));
            if (topicEl != null) {
                topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
                try {
                    topicElString = topicEl.toStringWithConsume();
                }
                catch (XMLStreamException e) {
                    logger.error("exception occured while creating NotificationConsumer", (Throwable)e);
                }
                additionalMessageContent.setTopicElement(topicElString);
            }
            if ((producerReferenceEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "ProducerReference"))) != null) {
                try {
                    producerReferenceElString = producerReferenceEl.toStringWithConsume();
                }
                catch (XMLStreamException e) {
                    logger.error("exception occured while creating notification consumer", (Throwable)e);
                }
                additionalMessageContent.setProducerReference(producerReferenceElString);
            }
            OMElement notificationMessageEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
            String message = null;
            try {
                message = notificationMessageEl.toStringWithConsume();
            }
            catch (XMLStreamException e) {
                logger.error("exception occured while creating notification consumer", (Throwable)e);
                throw new AxisFault("unable to serialize the message", (Throwable)e);
            }
            this.matchAndSave(message, topicLocalString, additionalMessageContent);
        }
        if (noElements) {
            throw new AxisFault("at least one element is required");
        }
    }

    private void matchAndSave(String notificationMessage, String topicLocalString, AdditionalMessageContent additionalMessageContent) {
        LinkedList<ConsumerInfo> matchedConsumers = new LinkedList<ConsumerInfo>();
        try {
            for (AbstractMessageMatcher matcher : this.wsmgConfigContext.getMessageMatchers()) {
                matcher.populateMatches(null, additionalMessageContent, notificationMessage, topicLocalString, matchedConsumers);
            }
            this.save(matchedConsumers, notificationMessage, additionalMessageContent);
        }
        catch (RuntimeException e) {
            logger.error("Caught RuntimeException", (Throwable)e);
        }
    }

    public void save(List<ConsumerInfo> consumerInfoList, String message, AdditionalMessageContent additionalMessageContent) {
        if (consumerInfoList.size() == 0) {
            return;
        }
        RunTimeStatistics.addNewNotificationMessageSize(message.length());
        OutGoingMessage outGoingMessage = new OutGoingMessage();
        outGoingMessage.setTextMessage(message);
        outGoingMessage.setConsumerInfoList(consumerInfoList);
        outGoingMessage.setAdditionalMessageContent(additionalMessageContent);
        this.outgoingQueue.storeNotification(outGoingMessage, this.getNextMsgId());
    }

    private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
        this.amqpNotificationProcessor.notify(ctx, protocolNs);
    }
}

