/*
 * Decompiled with CFR 0.152.
 */
package hkube.algo.wrapper;

import hkube.algo.ICommandSender;
import hkube.communication.ICommConfig;
import hkube.communication.streaming.Flow;
import hkube.communication.streaming.IListener;
import hkube.communication.streaming.IMessageListener;
import hkube.communication.streaming.IProducer;
import hkube.communication.streaming.IStatisticsListener;
import hkube.communication.streaming.IStreamingManagerMsgListener;
import hkube.communication.streaming.MessageListener;
import hkube.communication.streaming.MessageProducer;
import hkube.communication.streaming.zmq.Listener;
import hkube.communication.streaming.zmq.Producer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamingManager
implements IMessageListener {
    ICommandSender errorHandler;
    MessageProducer messageProducer;
    Map<String, MessageListener> messageListeners = new HashMap<String, MessageListener>();
    List<IStreamingManagerMsgListener> registeredListeners = new ArrayList<IStreamingManagerMsgListener>();
    boolean listeningToMessages = false;
    Map<String, List> parsedFlows = new HashMap<String, List>();
    String defaultFlow;
    ICommConfig commConfig;
    ThreadLocal local = new ThreadLocal();

    StreamingManager(ICommandSender errorHandler, ICommConfig commConfig) {
        this.commConfig = commConfig;
        this.errorHandler = errorHandler;
    }

    public boolean isListeningToMessages() {
        return this.listeningToMessages;
    }

    void setParsedFlows(Map<String, List> flows, String defaultFlow) {
        this.parsedFlows = flows;
        this.defaultFlow = defaultFlow;
    }

    void setupStreamingProducer(IStatisticsListener onStatistics, List nextNodes, String me) {
        Producer zmqProducer = new Producer(me, this.commConfig.getStreamListeningPort(), nextNodes, this.commConfig.getEncodingType(), (double)this.commConfig.getStreamMaxBufferSize().intValue() * 1024.0 * 1024.0, this.errorHandler);
        this.messageProducer = new MessageProducer((IProducer)zmqProducer, this.commConfig, nextNodes);
        this.messageProducer.registerStatisticsListener(onStatistics);
        if (nextNodes.size() > 0) {
            this.messageProducer.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setupStreamingListeners(List<Map> parents, String nodeName) {
        Map<String, MessageListener> map = this.messageListeners;
        synchronized (map) {
            parents.stream().forEach(predecessor -> {
                Map address = (Map)predecessor.get("address");
                String host = (String)address.get("host");
                String port = address.get("port") + "";
                String type = (String)predecessor.get("type");
                String originNodeName = (String)predecessor.get("nodeName");
                if (type.equals("Add")) {
                    Listener zmqListener = new Listener(host, port, this.commConfig.getEncodingType(), nodeName, this.errorHandler);
                    MessageListener listener = new MessageListener(this.commConfig, (IListener)zmqListener, originNodeName);
                    listener.register((IMessageListener)this);
                    if (this.listeningToMessages) {
                        listener.start();
                    }
                    this.messageListeners.put(host + port, listener);
                }
                if (type.equals("Del")) {
                    MessageListener listener = this.messageListeners.get(host + port);
                    if (this.listeningToMessages && listener != null) {
                        listener.close(false);
                    }
                    this.messageListeners.remove(host + port);
                }
            });
        }
    }

    public void registerInputListener(IStreamingManagerMsgListener onMessage) {
        this.registeredListeners.add(onMessage);
    }

    public void onMessage(Object msg, Flow flow, String origin) {
        this.local.set(flow);
        this.registeredListeners.stream().forEach(listener -> listener.onMessage(msg, origin));
        this.local.remove();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startMessageListening() {
        this.listeningToMessages = true;
        Map<String, MessageListener> map = this.messageListeners;
        synchronized (map) {
            this.messageListeners.values().stream().forEach(messageListener -> messageListener.start());
        }
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (StreamingManager.this.listeningToMessages) {
                    HashMap<String, MessageListener> clonedMessageListeners = new HashMap<String, MessageListener>(StreamingManager.this.messageListeners);
                    clonedMessageListeners.values().stream().forEach(messageListener -> messageListener.fetch());
                    try {
                        Thread.sleep(2L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    public void sendMessage(Object msg, String flowName) {
        if (this.messageProducer == null) {
            throw new RuntimeException("Trying to send a message from a none stream pipeline or after close had been applied on algorithm");
        }
        if (this.messageProducer.getConsumers().size() > 0) {
            Flow flow = (Flow)this.local.get();
            if (flow == null || flowName != null) {
                if (flowName == null) {
                    flowName = this.defaultFlow;
                }
                List parsedFlow = this.parsedFlows.get(flowName);
                flow = new Flow(parsedFlow);
            }
            this.messageProducer.produce(flow, msg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopStreaming(boolean force) {
        if (this.listeningToMessages) {
            Map<String, MessageListener> map = this.messageListeners;
            synchronized (map) {
                this.messageListeners.values().stream().forEach(listener -> listener.close(force));
                this.clearListeners();
                this.listeningToMessages = false;
            }
            this.registeredListeners = new ArrayList<IStreamingManagerMsgListener>();
        }
        if (this.messageProducer != null) {
            this.messageProducer.close(force);
        }
        this.messageProducer = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clearListeners() {
        Map<String, MessageListener> map = this.messageListeners;
        synchronized (map) {
            this.messageListeners.clear();
        }
    }
}

