/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.wsmg.messenger.strategy.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
import org.apache.airavata.wsmg.messenger.Deliverable;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
import org.apache.airavata.wsmg.messenger.strategy.impl.ConsumerHandler;
import org.apache.airavata.wsmg.messenger.strategy.impl.LightweightMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelSender
implements SendingStrategy {
    private static final Logger log = LoggerFactory.getLogger(ParallelSender.class);
    private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30L;
    private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap();
    private ExecutorService threadPool;

    @Override
    public void init() {
        this.threadPool = Executors.newCachedThreadPool();
    }

    @Override
    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
        List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList();
        for (ConsumerInfo consumer : consumerInfoList) {
            this.sendToConsumerHandler(consumer, outMessage, deliverable);
        }
    }

    @Override
    public void shutdown() {
        log.debug("Shutting down");
        this.threadPool.shutdown();
        try {
            this.threadPool.awaitTermination(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ie) {
            log.error("Interrupted while waiting thread pool to shutdown");
        }
        log.debug("Shut down");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
        String consumerUrl = consumer.getConsumerEprStr();
        LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(), message.getAdditionalMessageContent());
        HashMap<String, ConsumerHandler> hashMap = this.activeConsumerHandlers;
        synchronized (hashMap) {
            ConsumerHandler handler = this.activeConsumerHandlers.get(consumerUrl);
            if (handler == null) {
                handler = new ParallelConsumerHandler(consumerUrl, deliverable);
                this.activeConsumerHandlers.put(consumerUrl, handler);
                handler.submitMessage(lwm);
                this.threadPool.submit(handler);
            } else {
                handler.submitMessage(lwm);
            }
        }
    }

    class ParallelConsumerHandler
    extends ConsumerHandler {
        private static final int MAX_UNSUCCESSFUL_DRAINS = 3;
        private static final int SLEEP_TIME_SECONDS = 1;
        private int numberOfUnsuccessfulDrain;

        public ParallelConsumerHandler(String url, Deliverable deliverable) {
            super(url, deliverable);
            this.numberOfUnsuccessfulDrain = 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            log.debug(String.format("ParallelConsumerHandler starting: %s", this.getConsumerUrl()));
            ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
            while (true) {
                this.numberOfUnsuccessfulDrain = this.queue.drainTo(localList) <= 0 ? ++this.numberOfUnsuccessfulDrain : 0;
                if (this.numberOfUnsuccessfulDrain >= 3) {
                    HashMap hashMap = ParallelSender.this.activeConsumerHandlers;
                    synchronized (hashMap) {
                        if (this.queue.size() == 0) {
                            if (ParallelSender.this.activeConsumerHandlers.remove(this.getConsumerUrl()) != null) {
                                log.debug(String.format("Consumer handler is already removed: %s", this.getConsumerUrl()));
                            }
                            log.debug(String.format("ParallelConsumerHandler done: %s,", this.getConsumerUrl()));
                            break;
                        }
                    }
                }
                this.send(localList);
                localList.clear();
                if (this.numberOfUnsuccessfulDrain <= 0) continue;
                this.waitForMessages();
            }
        }

        private void waitForMessages() {
            try {
                TimeUnit.SECONDS.sleep(1L);
                log.debug("finished - waiting for messages");
            }
            catch (InterruptedException e) {
                log.error("interrupted while waiting for messages", (Throwable)e);
            }
        }
    }
}

