/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.remote.galaxy;

import co.paralleluniverse.concurrent.util.MapUtil;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.galaxy.MessageListener;
import co.paralleluniverse.galaxy.cluster.NodeChangeListener;
import co.paralleluniverse.galaxy.quasar.Grid;
import co.paralleluniverse.io.serialization.Serialization;
import co.paralleluniverse.remote.galaxy.GlxRemoteChannel;
import co.paralleluniverse.strands.channels.SendPort;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteChannelReceiver<Message>
implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteChannelReceiver.class);
    private static final ConcurrentMap<SendPort<?>, RemoteChannelReceiver<?>> receivers = MapUtil.newConcurrentHashMap();
    private static final AtomicLong topicGen = new AtomicLong(1000L);
    private final SendPort<Message> channel;
    private final long topic;
    private volatile MessageFilter<Message> filter;
    private final Map<Short, Integer> references = new ConcurrentHashMap<Short, Integer>();

    public static <Message> RemoteChannelReceiver<Message> getReceiver(SendPort<Message> channel) {
        RemoteChannelReceiver<Message> receiver = (RemoteChannelReceiver<Message>)receivers.get(channel);
        if (receiver == null) {
            receiver = new RemoteChannelReceiver<Message>(channel);
            RemoteChannelReceiver<Message> tmp = receivers.putIfAbsent(channel, receiver);
            if (tmp == null) {
                super.subscribe();
            } else {
                receiver = tmp;
            }
        }
        return receiver;
    }

    void shutdown() {
        this.unsubscribe();
        receivers.remove(this.channel);
    }

    private RemoteChannelReceiver(SendPort<Message> channel) {
        this.channel = channel;
        this.topic = topicGen.incrementAndGet();
        try {
            Grid.getInstance().cluster().addNodeChangeListener(new NodeChangeListener(){

                public void nodeAdded(short id) {
                }

                public void nodeSwitched(short id) {
                }

                public void nodeRemoved(short id) {
                    LOG.debug("decrease RefCount for {} from node {}", (Object)this, (Object)id);
                    RemoteChannelReceiver.this.references.remove(id);
                    if (RemoteChannelReceiver.this.references.isEmpty()) {
                        LOG.debug("Shutting down receiver due to zero references" + this);
                        RemoteChannelReceiver.this.shutdown();
                    }
                }
            });
        }
        catch (InterruptedException ex) {
            LOG.error(ex.toString());
        }
    }

    public void setFilter(MessageFilter<Message> filter) {
        this.filter = filter;
    }

    public void messageReceived(short fromNode, byte[] message) {
        Object m1 = Serialization.getInstance().read(message);
        LOG.debug("Received: " + m1);
        if (m1 instanceof GlxRemoteChannel.CloseMessage) {
            Throwable t = ((GlxRemoteChannel.CloseMessage)m1).getException();
            if (t != null) {
                this.channel.close(t);
            } else {
                this.channel.close();
            }
            this.unsubscribe();
            return;
        }
        if (m1 instanceof GlxRemoteChannel.RefMessage) {
            this.handleRefMessage((GlxRemoteChannel.RefMessage)m1);
            return;
        }
        Object m = m1;
        if (this.filter == null || this.filter.shouldForwardMessage(m)) {
            try {
                this.channel.send(m);
            }
            catch (SuspendExecution e) {
                throw new AssertionError((Object)e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void subscribe() {
        GlxRemoteChannel.getMessenger().addMessageListener(this.topic, (MessageListener)this);
    }

    private void unsubscribe() {
        GlxRemoteChannel.getMessenger().removeMessageListener(this.topic, (MessageListener)this);
    }

    public long getTopic() {
        return this.topic;
    }

    void handleRefMessage(GlxRemoteChannel.RefMessage msg) throws RuntimeException {
        LOG.debug("handling: " + msg);
        if (msg.isAdd()) {
            Integer refCount = this.references.get(msg.getNodeId());
            if (refCount == null) {
                this.references.put(msg.getNodeId(), 1);
            } else {
                this.references.put(msg.getNodeId(), refCount + 1);
            }
        } else {
            Integer refCount = this.references.get(msg.getNodeId());
            if (refCount == null) {
                throw new RuntimeException("decrease reference counter message received for unknown cluster node");
            }
            if ((refCount = Integer.valueOf(refCount - 1)) > 0) {
                this.references.put(msg.getNodeId(), refCount);
            } else {
                this.references.remove(msg.getNodeId());
                if (this.references.isEmpty()) {
                    this.shutdown();
                }
            }
        }
    }

    public static interface MessageFilter<Message> {
        public boolean shouldForwardMessage(Message var1);
    }
}

