/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.wsmg.messenger.strategy.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
import org.apache.airavata.wsmg.messenger.Deliverable;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
import org.apache.airavata.wsmg.messenger.strategy.impl.ConsumerHandler;
import org.apache.airavata.wsmg.messenger.strategy.impl.LightweightMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FixedParallelSender
implements SendingStrategy {
    private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
    private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30L;
    private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap();
    private HashMap<String, Boolean> submittedConsumerHandlers = new HashMap();
    private int batchSize;
    private ExecutorService threadPool;
    private boolean stop;
    private Thread t;

    public FixedParallelSender(int poolsize, int batchsize) {
        this.threadPool = Executors.newFixedThreadPool(poolsize);
        this.batchSize = batchsize;
    }

    @Override
    public void init() {
        this.t = new Thread(new ChooseHandlerToSubmit());
        this.t.start();
    }

    @Override
    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
        List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList();
        for (ConsumerInfo consumer : consumerInfoList) {
            this.sendToConsumerHandler(consumer, outMessage, deliverable);
        }
    }

    @Override
    public void shutdown() {
        log.debug("Shutting down");
        this.stop = true;
        try {
            this.t.join();
        }
        catch (InterruptedException ie) {
            log.error("Wait for ChooseHandlerToSubmit thread to finish (join) is interrupted");
        }
        this.threadPool.shutdown();
        try {
            this.threadPool.awaitTermination(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ie) {
            log.error("Interrupted while waiting thread pool to shutdown");
        }
        log.debug("Shut down");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
        String consumerUrl = consumer.getConsumerEprStr();
        LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(), message.getAdditionalMessageContent());
        HashMap<String, ConsumerHandler> hashMap = this.activeConsumerHandlers;
        synchronized (hashMap) {
            ConsumerHandler handler = this.activeConsumerHandlers.get(consumerUrl);
            if (handler == null) {
                handler = new FixedParallelConsumerHandler(consumerUrl, deliverable);
                this.activeConsumerHandlers.put(consumerUrl, handler);
                this.submittedConsumerHandlers.put(consumerUrl, Boolean.FALSE);
            }
            handler.submitMessage(lwm);
        }
    }

    class FixedParallelConsumerHandler
    extends ConsumerHandler {
        public FixedParallelConsumerHandler(String url, Deliverable deliverable) {
            super(url, deliverable);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            log.debug(String.format("FixedParallelConsumerHandler starting: %s", this.getConsumerUrl()));
            ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
            this.queue.drainTo(localList, FixedParallelSender.this.batchSize);
            this.send(localList);
            localList.clear();
            HashMap hashMap = FixedParallelSender.this.activeConsumerHandlers;
            synchronized (hashMap) {
                FixedParallelSender.this.submittedConsumerHandlers.put(this.getConsumerUrl(), Boolean.FALSE);
                if (this.queue.size() == 0) {
                    FixedParallelSender.this.submittedConsumerHandlers.remove(this.getConsumerUrl());
                    FixedParallelSender.this.activeConsumerHandlers.remove(this.getConsumerUrl());
                    log.debug(String.format("Consumer handler is already removed: %s", this.getConsumerUrl()));
                }
            }
            log.debug(String.format("FixedParallelConsumerHandler done: %s,", this.getConsumerUrl()));
        }
    }

    class ChooseHandlerToSubmit
    implements Runnable {
        private static final int SLEEP_TIME_SECONDS = 1;

        ChooseHandlerToSubmit() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!FixedParallelSender.this.stop || FixedParallelSender.this.activeConsumerHandlers.size() > 0) {
                HashMap hashMap = FixedParallelSender.this.activeConsumerHandlers;
                synchronized (hashMap) {
                    for (String key : FixedParallelSender.this.activeConsumerHandlers.keySet()) {
                        boolean submitted = (Boolean)FixedParallelSender.this.submittedConsumerHandlers.get(key);
                        if (submitted) continue;
                        FixedParallelSender.this.threadPool.submit((Runnable)FixedParallelSender.this.activeConsumerHandlers.get(key));
                        FixedParallelSender.this.submittedConsumerHandlers.put(key, Boolean.TRUE);
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(1L);
                }
                catch (InterruptedException e) {
                    log.error("interrupted while waiting", (Throwable)e);
                }
            }
        }
    }
}

