/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.wsmg.broker;

import java.io.File;
import java.lang.reflect.Constructor;
import org.apache.airavata.wsmg.broker.NotificationProcessor;
import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.commons.storage.WsmgInMemoryStorage;
import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
import org.apache.airavata.wsmg.commons.util.Axis2Utils;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
import org.apache.airavata.wsmg.messenger.DeliveryProcessor;
import org.apache.airavata.wsmg.messenger.SenderUtils;
import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
import org.apache.airavata.wsmg.messenger.strategy.impl.SerialSender;
import org.apache.airavata.wsmg.util.RunTimeStatistics;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
import org.apache.axis2.engine.ServiceLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerServiceLifeCycle
implements ServiceLifeCycle {
    private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
    private static final long DEFAULT_SOCKET_TIME_OUT = 20000L;
    private DeliveryProcessor proc;
    private ConsumerUrlManager urlManager;

    public void shutDown(ConfigurationContext arg, AxisService service) {
        log.info("broker shutting down");
        if (this.proc != null) {
            this.proc.stop();
            this.proc = null;
        }
        if (this.urlManager != null) {
            this.urlManager.stop();
            this.urlManager = null;
        }
        log.info("broker shut down");
    }

    public void startUp(ConfigurationContext configContext, AxisService axisService) {
        Boolean inited = (Boolean)configContext.getProperty("broker.inited");
        if (inited == null || !inited.booleanValue()) {
            log.info("starting broker");
            Axis2Utils.overrideAddressingPhaseHander((ConfigurationContext)configContext, (AddressingBasedDispatcher)new PublishedMessageHandler());
            WsmgConfigurationContext brokerConext = this.initConfigurations(configContext, axisService);
            this.initQueue(brokerConext);
            this.initDeliveryMethod(brokerConext.getConfigurationManager());
            inited = true;
            configContext.setProperty("broker.inited", (Object)inited);
        } else {
            log.info("init was already done by another webservice");
        }
    }

    private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) {
        WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
        configContext.setProperty("broker.wsmgconfig", (Object)wsmgConfig);
        ConfigurationManager configMan = new ConfigurationManager("conf" + File.separator + "msgBroker.properties");
        wsmgConfig.setConfigurationManager(configMan);
        String type = configMan.getConfig("broker.storage.type", "persistent");
        if ("memory".equalsIgnoreCase(type)) {
            WsmgInMemoryStorage inmem = new WsmgInMemoryStorage();
            wsmgConfig.setStorage(inmem);
            wsmgConfig.setQueue(inmem);
            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, inmem));
        } else {
            String jdbcUrl = configMan.getConfig("broker.jdbc.url");
            String jdbcDriver = configMan.getConfig("broker.jdbc.driver");
            WsmgPersistantStorage persis = new WsmgPersistantStorage(jdbcUrl, jdbcDriver);
            wsmgConfig.setStorage(persis);
            wsmgConfig.setQueue(persis);
            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, persis));
        }
        NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig);
        wsmgConfig.setNotificationProcessor(notificatonProcessor);
        return wsmgConfig;
    }

    private void initQueue(WsmgConfigurationContext context) {
        log.info("setting up queue");
        WSMGParameter.OUT_GOING_QUEUE = context.getQueue();
        RunTimeStatistics.setStartUpTime();
    }

    private void initDeliveryMethod(ConfigurationManager configMan) {
        DeliveryProtocol protocol;
        String shouldStart = configMan.getConfig("broker.start.delivery.thread");
        if (!Boolean.parseBoolean(shouldStart)) {
            if (configMan.getConfig("broker.storage.type", "persistent").equalsIgnoreCase("memory")) {
                log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth.");
            }
            return;
        }
        String protocolClass = configMan.getConfig("broker.delivery.protocol", Axis2Protocol.class.getName());
        try {
            Class<?> cl = Class.forName(protocolClass);
            Constructor<?> co = cl.getConstructor(null);
            protocol = (DeliveryProtocol)co.newInstance(null);
        }
        catch (Exception e) {
            log.error("Cannot initial protocol sender", (Throwable)e);
            return;
        }
        protocol.setTimeout(configMan.getConfig("broker.socket.timeout", 20000L));
        SendingStrategy method = null;
        String initedmethod = null;
        String deliveryMethod = configMan.getConfig("broker.delivery.method", "serial");
        if ("parallel".equalsIgnoreCase(deliveryMethod)) {
            method = new ParallelSender();
            initedmethod = "parallel";
        } else if ("pcrew".equalsIgnoreCase(deliveryMethod)) {
            int poolsize = configMan.getConfig("sending.thread.pool.size", 4);
            int batchsize = configMan.getConfig("sending.batch.size", 10);
            method = new FixedParallelSender(poolsize, batchsize);
            initedmethod = "pcrew";
        } else {
            method = new SerialSender();
            initedmethod = "serial";
        }
        this.urlManager = new ConsumerUrlManager(configMan);
        SenderUtils senderUtils = new SenderUtils(this.urlManager);
        senderUtils.setProtocol(protocol);
        this.proc = new DeliveryProcessor(senderUtils, method);
        this.proc.start();
        log.info(initedmethod + " sending method inited");
    }
}

