/*
 * Decompiled with CFR 0.152.
 */
package io.nextop.client;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import io.nextop.Id;
import io.nextop.Message;
import io.nextop.Route;
import io.nextop.client.AbstractMessageControlNode;
import io.nextop.client.MessageControl;
import io.nextop.client.MessageControlChannel;
import io.nextop.client.MessageControlMetrics;
import io.nextop.client.MessageControlNode;
import io.nextop.client.MessageControlState;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;

public class SubjectNode
extends AbstractMessageControlNode {
    private MessageControlNode downstream;
    private ListMultimap<Route, Subscriber> receivers = ArrayListMultimap.create();
    private List<Subscriber> defaultReceivers = new ArrayList<Subscriber>();

    public SubjectNode(MessageControlNode downstream) {
        this.downstream = downstream;
    }

    public void send(Message message) {
        this.onMessageControl(new MessageControl(MessageControl.Type.SEND, message));
    }

    public Observable<Message> receive(final Route route) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Message>(){

            public void call(final Subscriber<? super Message> subscriber) {
                boolean s = SubjectNode.this.receivers.put((Object)route, subscriber);
                assert (s);
                BooleanSubscription subscription = BooleanSubscription.create((Action0)new Action0(){

                    public void call() {
                        boolean s = SubjectNode.this.receivers.remove((Object)route, (Object)subscriber);
                        assert (s);
                        SubjectNode.this.downstream.post(new Runnable(){

                            @Override
                            public void run() {
                                SubjectNode.this.onMessageControl(new MessageControl(MessageControl.Type.UNSUBSCRIBE, Message.newBuilder().setRoute(route).build()));
                            }
                        });
                    }
                });
                subscriber.add((Subscription)subscription);
                assert (!subscription.isUnsubscribed());
                SubjectNode.this.downstream.post(new Runnable(){

                    @Override
                    public void run() {
                        SubjectNode.this.onMessageControl(new MessageControl(MessageControl.Type.SUBSCRIBE, Message.newBuilder().setRoute(route).build()));
                    }
                });
            }
        });
    }

    public Observable<Message> defaultReceive() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Message>(){

            public void call(final Subscriber<? super Message> subscriber) {
                boolean s = SubjectNode.this.defaultReceivers.add(subscriber);
                assert (s);
                BooleanSubscription subscription = BooleanSubscription.create((Action0)new Action0(){

                    public void call() {
                        boolean s = SubjectNode.this.defaultReceivers.remove(subscriber);
                        assert (s);
                    }
                });
                subscriber.add((Subscription)subscription);
                assert (!subscription.isUnsubscribed());
            }
        });
    }

    public void cancelSend(final Id id) {
        this.downstream.post(new Runnable(){

            @Override
            public void run() {
                SubjectNode.this.onMessageControl(new MessageControl(MessageControl.Type.SEND_NACK, Message.newBuilder().setRoute(Message.outboxRoute(id)).build()));
            }
        });
    }

    @Override
    protected void initDownstream() {
        this.downstream.init(new MessageControlChannel(){

            @Override
            public void onActive(boolean active, MessageControlMetrics metrics) {
                SubjectNode.this.upstream.onActive(active, metrics);
            }

            @Override
            public void onTransfer(MessageControlState mcs) {
                SubjectNode.this.upstream.onTransfer(mcs);
            }

            @Override
            public void onMessageControl(MessageControl mc) {
                switch (mc.type) {
                    case RECEIVE: {
                        Subscriber firstSubscriber = (Subscriber)Iterables.getFirst((Iterable)Iterables.concat((Iterable)SubjectNode.this.receivers.get((Object)mc.message.route), (Iterable)SubjectNode.this.defaultReceivers), null);
                        if (null == firstSubscriber) break;
                        try {
                            firstSubscriber.onNext((Object)mc.message);
                            this.onMessageControl(new MessageControl(MessageControl.Type.RECEIVE_ACK, Message.newBuilder().setRoute(Message.outboxRoute(mc.message.id)).build()));
                        }
                        catch (Throwable t) {
                            this.onMessageControl(new MessageControl(MessageControl.Type.RECEIVE_NACK, Message.newBuilder().setRoute(Message.outboxRoute(mc.message.id)).build()));
                        }
                        break;
                    }
                    case RECEIVE_ERROR: {
                        Subscriber firstSubscriber = (Subscriber)Iterables.getFirst((Iterable)Iterables.concat((Iterable)SubjectNode.this.receivers.get((Object)mc.message.route), (Iterable)SubjectNode.this.defaultReceivers), null);
                        if (null == firstSubscriber) break;
                        try {
                            firstSubscriber.onError((Throwable)new ReceiveException(mc.message));
                            this.onMessageControl(new MessageControl(MessageControl.Type.RECEIVE_ACK, Message.newBuilder().setRoute(Message.outboxRoute(mc.message.id)).build()));
                        }
                        catch (Throwable t) {
                            this.onMessageControl(new MessageControl(MessageControl.Type.RECEIVE_NACK, Message.newBuilder().setRoute(Message.outboxRoute(mc.message.id)).build()));
                        }
                        break;
                    }
                }
            }

            @Override
            public void post(Runnable r) {
                SubjectNode.this.upstream.post(r);
            }

            @Override
            public void postDelayed(Runnable r, int delayMs) {
                SubjectNode.this.upstream.postDelayed(r, delayMs);
            }
        });
    }

    @Override
    protected void startDownstream() {
        this.downstream.start();
    }

    @Override
    protected void stopDownstream() {
        this.downstream.stop();
    }

    @Override
    public void onActive(boolean active, MessageControlMetrics metrics) {
        this.downstream.onActive(active, metrics);
    }

    @Override
    public void onTransfer(MessageControlState mcs) {
        this.downstream.onTransfer(mcs);
    }

    @Override
    public void onMessageControl(MessageControl mc) {
        this.downstream.onMessageControl(mc);
    }

    public static final class ReceiveException
    extends Exception {
        public final Message message;

        private ReceiveException(Message message) {
            this.message = message;
        }
    }
}

