/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.wsmg.broker;

import java.lang.reflect.Constructor;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.Properties;
import org.apache.airavata.common.utils.ServiceUtils;
import org.apache.airavata.registry.api.AbstractRegistryUpdaterThread;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.api.util.RegistryUtils;
import org.apache.airavata.wsmg.broker.NotificationProcessor;
import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.commons.storage.WsmgInMemoryStorage;
import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
import org.apache.airavata.wsmg.commons.util.Axis2Utils;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
import org.apache.airavata.wsmg.messenger.DeliveryProcessor;
import org.apache.airavata.wsmg.messenger.SenderUtils;
import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
import org.apache.airavata.wsmg.messenger.strategy.impl.SerialSender;
import org.apache.airavata.wsmg.util.RunTimeStatistics;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
import org.apache.axis2.engine.ServiceLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerServiceLifeCycle
implements ServiceLifeCycle {
    private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
    public static final String REPOSITORY_PROPERTIES = "airavata-server.properties";
    public static final int GFAC_URL_UPDATE_INTERVAL = 10800000;
    public static final int JCR_AVAIALABILITY_WAIT_INTERVAL = 10000;
    public static final String JCR_CLASS = "jcr.class";
    public static final String JCR_USER = "jcr.user";
    public static final String JCR_PASS = "jcr.pass";
    public static final String ORG_APACHE_JACKRABBIT_REPOSITORY_URI = "org.apache.jackrabbit.repository.uri";
    private static final String MESSAGE_BROKER_SERVICE_NAME = "EventingService";
    private static final String SERVICE_URL = "message_broker_service_url";
    private static final String JCR_REGISTRY = "registry";
    private Thread thread;
    private static final long DEFAULT_SOCKET_TIME_OUT = 20000L;
    private DeliveryProcessor proc;
    private ConsumerUrlManager urlManager;
    private static Boolean initialized = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutDown(ConfigurationContext configurationcontext, AxisService service) {
        log.info("broker shutting down");
        if (this.proc != null) {
            this.proc.stop();
            this.proc = null;
        }
        if (this.urlManager != null) {
            this.urlManager.stop();
            this.urlManager = null;
        }
        Boolean bl = initialized;
        synchronized (bl) {
            if (initialized.booleanValue()) {
                initialized = false;
                AiravataRegistry2 registry = (AiravataRegistry2)configurationcontext.getProperty(JCR_REGISTRY);
                if (registry != null && this.thread != null) {
                    registry.unsetEventingURI();
                    this.thread.interrupt();
                    try {
                        this.thread.join();
                    }
                    catch (InterruptedException e) {
                        log.info("Message box url update thread is interrupted");
                    }
                }
            }
        }
        log.info("broker shut down");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startUp(ConfigurationContext configContext, AxisService axisService) {
        Boolean inited = (Boolean)configContext.getProperty("broker.inited");
        if (inited == null || !inited.booleanValue()) {
            log.info("starting broker");
            Axis2Utils.overrideAddressingPhaseHander((ConfigurationContext)configContext, (AddressingBasedDispatcher)new PublishedMessageHandler());
            WsmgConfigurationContext brokerConext = this.initConfigurations(configContext, axisService);
            this.initQueue(brokerConext);
            this.initDeliveryMethod(brokerConext.getConfigurationManager());
            inited = true;
            configContext.setProperty("broker.inited", (Object)inited);
        } else {
            log.info("init was already done by another webservice");
        }
        final ConfigurationContext context = configContext;
        Boolean bl = initialized;
        synchronized (bl) {
            if (!initialized.booleanValue()) {
                initialized = true;
                new Thread(){

                    @Override
                    public void run() {
                        Properties properties = new Properties();
                        try {
                            URL url = this.getClass().getClassLoader().getResource(BrokerServiceLifeCycle.REPOSITORY_PROPERTIES);
                            properties.load(url.openStream());
                            HashMap<Object, Object> map = new HashMap<Object, Object>(properties);
                            try {
                                Thread.sleep(10000L);
                            }
                            catch (InterruptedException e1) {
                                e1.printStackTrace();
                            }
                            AiravataRegistry2 registry = RegistryUtils.getRegistryFromConfig((URL)url);
                            String localAddress = ServiceUtils.generateServiceURLFromConfigurationContext((ConfigurationContext)context, (String)BrokerServiceLifeCycle.MESSAGE_BROKER_SERVICE_NAME);
                            log.debug("MESSAGE BOX SERVICE_ADDRESS:" + localAddress);
                            context.setProperty(BrokerServiceLifeCycle.SERVICE_URL, (Object)new URI(localAddress));
                            context.setProperty(BrokerServiceLifeCycle.JCR_REGISTRY, (Object)registry);
                            BrokerServiceLifeCycle.this.thread = (Thread)((Object)new MsgBrokerURLRegisterThread(registry, context));
                            BrokerServiceLifeCycle.this.thread.start();
                        }
                        catch (Exception e) {
                            log.error(e.getMessage(), (Throwable)e);
                        }
                    }
                }.start();
            }
        }
    }

    private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) {
        WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
        configContext.setProperty("broker.wsmgconfig", (Object)wsmgConfig);
        ConfigurationManager configMan = new ConfigurationManager(REPOSITORY_PROPERTIES);
        wsmgConfig.setConfigurationManager(configMan);
        String type = configMan.getConfig("broker.storage.type", "persistent");
        if ("memory".equalsIgnoreCase(type)) {
            WsmgInMemoryStorage inmem = new WsmgInMemoryStorage();
            wsmgConfig.setStorage(inmem);
            wsmgConfig.setQueue(inmem);
            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, inmem));
        } else {
            String jdbcUrl = configMan.getConfig("broker.jdbc.url");
            String jdbcDriver = configMan.getConfig("broker.jdbc.driver");
            WsmgPersistantStorage persis = new WsmgPersistantStorage(jdbcUrl, jdbcDriver);
            wsmgConfig.setStorage(persis);
            wsmgConfig.setQueue(persis);
            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, persis));
        }
        NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig);
        wsmgConfig.setNotificationProcessor(notificatonProcessor);
        return wsmgConfig;
    }

    private void initQueue(WsmgConfigurationContext context) {
        log.info("setting up queue");
        WSMGParameter.OUT_GOING_QUEUE = context.getQueue();
        RunTimeStatistics.setStartUpTime();
    }

    private void initDeliveryMethod(ConfigurationManager configMan) {
        DeliveryProtocol protocol;
        String shouldStart = configMan.getConfig("broker.start.delivery.thread");
        if (!Boolean.parseBoolean(shouldStart)) {
            if (configMan.getConfig("broker.storage.type", "persistent").equalsIgnoreCase("memory")) {
                log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth.");
            }
            return;
        }
        String protocolClass = configMan.getConfig("broker.delivery.protocol", Axis2Protocol.class.getName());
        try {
            Class<?> cl = Class.forName(protocolClass);
            Constructor<?> co = cl.getConstructor(null);
            protocol = (DeliveryProtocol)co.newInstance(null);
        }
        catch (Exception e) {
            log.error("Cannot initial protocol sender", (Throwable)e);
            return;
        }
        protocol.setTimeout(configMan.getConfig("broker.socket.timeout", 20000L));
        SendingStrategy method = null;
        String initedmethod = null;
        String deliveryMethod = configMan.getConfig("broker.delivery.method", "serial");
        if ("parallel".equalsIgnoreCase(deliveryMethod)) {
            method = new ParallelSender();
            initedmethod = "parallel";
        } else if ("pcrew".equalsIgnoreCase(deliveryMethod)) {
            int poolsize = configMan.getConfig("sending.thread.pool.size", 4);
            int batchsize = configMan.getConfig("sending.batch.size", 10);
            method = new FixedParallelSender(poolsize, batchsize);
            initedmethod = "pcrew";
        } else {
            method = new SerialSender();
            initedmethod = "serial";
        }
        this.urlManager = new ConsumerUrlManager(configMan);
        SenderUtils senderUtils = new SenderUtils(this.urlManager);
        senderUtils.setProtocol(protocol);
        this.proc = new DeliveryProcessor(senderUtils, method);
        this.proc.start();
        log.info(initedmethod + " sending method inited");
    }

    class MsgBrokerURLRegisterThread
    extends AbstractRegistryUpdaterThread {
        private ConfigurationContext context;

        public MsgBrokerURLRegisterThread(AiravataRegistry2 registry, ConfigurationContext context) {
            super(registry);
            this.context = null;
            this.context = context;
        }

        protected void updateRegistry(AiravataRegistry2 registry) {
            URI localAddress = (URI)this.context.getProperty(BrokerServiceLifeCycle.SERVICE_URL);
            registry.setEventingURI(localAddress);
            log.info("Updated Workflow Interpreter service URL in to Repository");
        }
    }
}

