/*
 * Decompiled with CFR 0.152.
 */
package ch.bind.philib.msg.vm;

import ch.bind.philib.lang.ExceptionUtil;
import ch.bind.philib.msg.MessageHandler;
import ch.bind.philib.msg.PubSub;
import ch.bind.philib.msg.Subscription;
import ch.bind.philib.util.CowSet;
import ch.bind.philib.validation.Validation;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PubSubVM
implements PubSub {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubVM.class);
    private final ConcurrentMap<String, Channel> chans = new ConcurrentHashMap<String, Channel>();
    private final ExecutorService executorService;
    private final Object subscriberLock = new Object();
    private static final AtomicIntegerFieldUpdater<Sub> SUB_ACTIVE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Sub.class, "active");

    public PubSubVM(ExecutorService executorService) {
        Validation.notNull(executorService);
        this.executorService = executorService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Subscription subscribe(String channelName, MessageHandler handler) {
        Validation.notNull(handler);
        Object object = this.subscriberLock;
        synchronized (object) {
            Channel chan = this.getOrCreateChan(channelName);
            Sub sub = chan.subscribe(handler);
            if (sub == null) {
                throw new IllegalArgumentException("double registration for channel='" + channelName + "' and handler: " + handler);
            }
            return sub;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unsubscribe(Channel channel, Sub sub) {
        if (!channel.subs.remove(sub)) {
            return;
        }
        if (channel.subs.isEmpty()) {
            Object object = this.subscriberLock;
            synchronized (object) {
                if (channel.subs.isEmpty()) {
                    this.chans.remove(channel.name);
                }
            }
        }
    }

    @Override
    public void publish(String channelName, Object message) {
        Validation.notNull(message);
        Channel chan = this.getChan(channelName);
        if (chan != null) {
            AsyncPublisher pub = new AsyncPublisher(chan, message);
            this.executorService.execute(pub);
        }
    }

    private Channel getChan(String channelName) {
        Validation.notNullOrEmpty(channelName);
        return (Channel)this.chans.get(channelName);
    }

    private Channel getOrCreateChan(String channelName) {
        Channel chan = this.getChan(channelName);
        if (chan != null) {
            return chan;
        }
        chan = new Channel(channelName);
        Channel other = this.chans.putIfAbsent(channelName, chan);
        return other == null ? chan : other;
    }

    @Override
    public Map<String, Integer> activeChannels() {
        HashMap<String, Integer> rv = null;
        for (Map.Entry e : this.chans.entrySet()) {
            String name = (String)e.getKey();
            Channel c = (Channel)e.getValue();
            int num = c.subs.size();
            if (num <= 0) continue;
            if (rv == null) {
                rv = new HashMap<String, Integer>();
            }
            rv.put(name, num);
        }
        return rv == null ? Collections.emptyMap() : rv;
    }

    private static void publishMessage(Channel channel, Object message) {
        Sub[] subs = (Sub[])channel.subs.getView();
        String channelName = channel.name;
        for (Sub sub : subs) {
            if (!sub.isActive()) continue;
            try {
                sub.handler.handleMessage(channelName, message);
            }
            catch (Exception e) {
                LOG.error("MessageHandler failed: " + ExceptionUtil.buildMessageChain(e));
            }
        }
    }

    private final class AsyncPublisher
    implements Runnable {
        private final Channel channel;
        private final Object message;

        public AsyncPublisher(Channel channel, Object message) {
            this.channel = channel;
            this.message = message;
        }

        @Override
        public void run() {
            PubSubVM.publishMessage(this.channel, this.message);
        }
    }

    private final class Sub
    implements Subscription {
        private final Channel channel;
        private final MessageHandler handler;
        volatile int active = 1;

        public Sub(Channel channel, MessageHandler handler) {
            this.channel = channel;
            this.handler = handler;
        }

        @Override
        public String getChannelName() {
            return this.channel.name;
        }

        @Override
        public void cancel() {
            if (SUB_ACTIVE_UPDATER.compareAndSet(this, 1, 0)) {
                PubSubVM.this.unsubscribe(this.channel, this);
            }
        }

        @Override
        public boolean isActive() {
            return this.active == 1;
        }

        public String toString() {
            return "Subscription[active=" + this.isActive() + ", channel=" + this.getChannelName() + "]";
        }

        public int hashCode() {
            return System.identityHashCode(this.handler);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj instanceof Sub) {
                Sub o = (Sub)obj;
                return this.handler == o.handler;
            }
            return false;
        }
    }

    private final class Channel {
        private final String name;
        private final CowSet<Sub> subs = new CowSet<Sub>(Sub.class);

        Channel(String name) {
            this.name = name;
        }

        Sub subscribe(MessageHandler handler) {
            Sub sub = new Sub(this, handler);
            if (this.subs.add(sub)) {
                return sub;
            }
            return null;
        }
    }
}

