/*
 * Decompiled with CFR 0.152.
 */
package ch.bind.philib.msg.vm;

import ch.bind.philib.lang.ExceptionUtil;
import ch.bind.philib.msg.MessageHandler;
import ch.bind.philib.msg.Subscription;
import ch.bind.philib.msg.vm.PubSub;
import ch.bind.philib.util.CowSet;
import ch.bind.philib.validation.Validation;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PubSubVM
implements PubSub {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubVM.class);
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = this.rwlock.readLock();
    private final Lock wlock = this.rwlock.writeLock();
    private final Map<String, Channel> channels = new HashMap<String, Channel>();
    private final ExecutorService executorService;
    private static final AtomicIntegerFieldUpdater<Sub> SUB_ACTIVE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Sub.class, "active");

    public PubSubVM(ExecutorService executorService) {
        Validation.notNull(executorService);
        this.executorService = executorService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Subscription subscribe(String channelName, MessageHandler handler) {
        Validation.notNull(handler);
        this.wlock.lock();
        try {
            Sub sub;
            Channel chan = this.getChannel(channelName);
            if (chan == null) {
                chan = new Channel(channelName);
                this.channels.put(channelName, chan);
            }
            if ((sub = chan.subscribe(handler)) == null) {
                throw new IllegalArgumentException("double registration for channel='" + channelName + "' and handler: " + handler);
            }
            Sub sub2 = sub;
            return sub2;
        }
        finally {
            this.wlock.unlock();
        }
    }

    @Override
    public Subscription forward(String fromChannelName, String toChannelName) {
        return this.forward(this, fromChannelName, toChannelName);
    }

    @Override
    public Subscription forward(PubSub pubsub, String fromChannelName, String toChannelName) {
        Validation.notNullOrEmpty(fromChannelName);
        Validation.notNullOrEmpty(toChannelName);
        Forwarder handler = new Forwarder(toChannelName, pubsub);
        return this.subscribe(fromChannelName, handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unsubscribe(Channel channel, Sub sub) {
        if (!channel.subs.remove(sub)) {
            return;
        }
        if (channel.subs.isEmpty()) {
            this.wlock.lock();
            try {
                if (channel.subs.isEmpty()) {
                    this.channels.remove(channel.name);
                }
            }
            finally {
                this.wlock.unlock();
            }
        }
    }

    @Override
    public void publishSync(String channelName, Object message) {
        Validation.notNull(message);
        Channel chan = this.rlockedGetChannel(channelName);
        if (chan != null) {
            chan.publishSync(message);
        }
    }

    @Override
    public void publishAsync(String channelName, Object message) {
        Validation.notNull(message);
        Channel chan = this.rlockedGetChannel(channelName);
        if (chan != null) {
            chan.publishAsync(message);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Channel rlockedGetChannel(String channelName) {
        this.rlock.lock();
        try {
            Channel channel = this.getChannel(channelName);
            return channel;
        }
        finally {
            this.rlock.unlock();
        }
    }

    private Channel getChannel(String channelName) {
        Channel chan = this.channels.get(channelName);
        if (chan == null) {
            Validation.notNullOrEmpty(channelName);
        }
        return chan;
    }

    @Override
    public Map<String, Integer> activeChannels() {
        HashMap<String, Integer> rv = null;
        for (Map.Entry<String, Channel> e : this.channels.entrySet()) {
            String name = e.getKey();
            Channel c = e.getValue();
            int num = c.subs.size();
            if (num <= 0) continue;
            if (rv == null) {
                rv = new HashMap<String, Integer>();
            }
            rv.put(name, num);
        }
        return rv == null ? Collections.emptyMap() : rv;
    }

    private static void publishMessage(Channel channel, Object message) {
        Sub[] subs = (Sub[])channel.subs.getView();
        String channelName = channel.name;
        for (Sub sub : subs) {
            if (!sub.isActive()) continue;
            try {
                sub.handler.handleMessage(channelName, message);
            }
            catch (Exception e) {
                LOG.error("MessageHandler failed: " + ExceptionUtil.buildMessageChain(e));
            }
        }
    }

    private static final class Forwarder
    implements MessageHandler {
        private final String to;
        private final PubSub pubsub;

        Forwarder(String to, PubSub pubsub) {
            this.to = to;
            this.pubsub = pubsub;
        }

        @Override
        public void handleMessage(String channelName, Object message) {
            this.pubsub.publishSync(this.to, message);
        }
    }

    private final class AsyncPublisher
    implements Runnable {
        private final Channel channel;
        private final Object message;

        public AsyncPublisher(Channel channel, Object message) {
            this.channel = channel;
            this.message = message;
        }

        @Override
        public void run() {
            PubSubVM.publishMessage(this.channel, this.message);
        }
    }

    private final class Sub
    implements Subscription {
        private final Channel channel;
        private final MessageHandler handler;
        volatile int active = 1;

        public Sub(Channel channel, MessageHandler handler) {
            this.channel = channel;
            this.handler = handler;
        }

        @Override
        public String getChannelName() {
            return this.channel.name;
        }

        @Override
        public void cancel() {
            if (SUB_ACTIVE_UPDATER.compareAndSet(this, 1, 0)) {
                PubSubVM.this.unsubscribe(this.channel, this);
            }
        }

        @Override
        public boolean isActive() {
            return this.active == 1;
        }

        public String toString() {
            return "Subscription[active=" + this.isActive() + ", channel=" + this.getChannelName() + "]";
        }

        public int hashCode() {
            return System.identityHashCode(this.handler);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj instanceof Sub) {
                Sub o = (Sub)obj;
                return this.handler == o.handler;
            }
            return false;
        }
    }

    private final class Channel {
        private final String name;
        private final CowSet<Sub> subs = new CowSet<Sub>(Sub.class);

        Channel(String name) {
            this.name = name;
        }

        Sub subscribe(MessageHandler handler) {
            Sub sub = new Sub(this, handler);
            if (this.subs.add(sub)) {
                return sub;
            }
            return null;
        }

        void publishSync(Object message) {
            PubSubVM.publishMessage(this, message);
        }

        void publishAsync(Object message) {
            AsyncPublisher pub = new AsyncPublisher(this, message);
            PubSubVM.this.executorService.execute(pub);
        }
    }
}

