/*
 * Decompiled with CFR 0.152.
 */
package orbit.server.pipeline;

import java.util.concurrent.CancellationException;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.ContinuationImpl;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.InlineMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlin.reflect.KDeclarationContainer;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.channels.ChannelIterator;
import kotlinx.coroutines.channels.ReceiveChannel;
import mu.KLogger;
import mu.KotlinLogging;
import orbit.server.OrbitServerConfig;
import orbit.server.auth.AuthInfo;
import orbit.server.concurrent.RuntimeScopes;
import orbit.server.mesh.LocalNodeInfo;
import orbit.server.net.MessageContainer;
import orbit.server.net.MessageDirection;
import orbit.server.net.MessageMetadata;
import orbit.server.pipeline.Pipeline;
import orbit.server.pipeline.PipelineContext;
import orbit.server.pipeline.PipelineException;
import orbit.server.pipeline.PipelineSteps;
import orbit.shared.exception.CapacityExceededException;
import orbit.shared.exception.ExceptionKt;
import orbit.shared.net.Message;
import orbit.shared.net.MessageContent;
import orbit.shared.net.MessageTarget;
import orbit.util.concurrent.RailWorker;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Metadata(mv={1, 1, 15}, bv={1, 0, 3}, k=1, d1={"\u0000^\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0004\u0018\u00002\u00020\u0001B%\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u00a2\u0006\u0002\u0010\nJ\u0016\u0010\u0017\u001a\u00020\u00182\f\u0010\u0019\u001a\b\u0012\u0004\u0012\u00020\u00130\u001aH\u0002J\u0019\u0010\u001b\u001a\u00020\u001c2\u0006\u0010\u0016\u001a\u00020\u0013H\u0082@\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010\u001dJ\u001a\u0010\u001e\u001a\u00020\u001c2\u0006\u0010\u001f\u001a\u00020 2\n\b\u0002\u0010!\u001a\u0004\u0018\u00010\fJ\u0006\u0010\"\u001a\u00020\u001cJ\u0006\u0010#\u001a\u00020\u001cR\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0014\u0010\u000b\u001a\u00020\f8BX\u0082\u0004\u00a2\u0006\u0006\u001a\u0004\b\r\u0010\u000eR\u000e\u0010\b\u001a\u00020\tX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004\u00a2\u0006\u0002\n\u0000R#\u0010\u0011\u001a\u0017\u0012\u0013\u0012\u00110\u0013\u00a2\u0006\f\b\u0014\u0012\b\b\u0015\u0012\u0004\b\b(\u00160\u0012X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u0082\u0002\u0004\n\u0002\b\u0019\u00a8\u0006$"}, d2={"Lorbit/server/pipeline/Pipeline;", "", "config", "Lorbit/server/OrbitServerConfig;", "runtimeScopes", "Lorbit/server/concurrent/RuntimeScopes;", "pipelineSteps", "Lorbit/server/pipeline/PipelineSteps;", "localNodeInfo", "Lorbit/server/mesh/LocalNodeInfo;", "(Lorbit/server/OrbitServerConfig;Lorbit/server/concurrent/RuntimeScopes;Lorbit/server/pipeline/PipelineSteps;Lorbit/server/mesh/LocalNodeInfo;)V", "localMeta", "Lorbit/server/net/MessageMetadata;", "getLocalMeta", "()Lorbit/server/net/MessageMetadata;", "logger", "Lmu/KLogger;", "pipelineRails", "Lorbit/util/concurrent/RailWorker;", "Lorbit/server/net/MessageContainer;", "Lkotlin/ParameterName;", "name", "container", "launchRail", "Lkotlinx/coroutines/Job;", "receiveChannel", "Lkotlinx/coroutines/channels/ReceiveChannel;", "onMessage", "", "(Lorbit/server/net/MessageContainer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "pushMessage", "msg", "Lorbit/shared/net/Message;", "meta", "start", "stop", "orbit-server"})
public final class Pipeline {
    private final KLogger logger;
    private final RailWorker<MessageContainer> pipelineRails;
    private final OrbitServerConfig config;
    private final RuntimeScopes runtimeScopes;
    private final PipelineSteps pipelineSteps;
    private final LocalNodeInfo localNodeInfo;

    public final void start() {
        this.pipelineRails.startWorkers();
    }

    public final void stop() {
        this.pipelineRails.stopWorkers();
    }

    public final void pushMessage(@NotNull Message msg, @Nullable MessageMetadata meta) {
        Intrinsics.checkParameterIsNotNull((Object)msg, (String)"msg");
        boolean bl = this.pipelineRails.isInitialized();
        boolean bl2 = false;
        boolean bl3 = false;
        if (!bl) {
            boolean bl4 = false;
            String string = "The Orbit pipeline is not in a state to receive messages. Did you start the Orbit stage?";
            throw (Throwable)new IllegalStateException(string.toString());
        }
        MessageMetadata messageMetadata = meta;
        if (messageMetadata == null) {
            messageMetadata = this.getLocalMeta();
        }
        MessageContainer container = new MessageContainer(msg, messageMetadata);
        this.logger.trace((Function0)new Function0<String>(container){
            final /* synthetic */ MessageContainer $container;

            @NotNull
            public final String invoke() {
                return "Writing message to pipeline channel: " + this.$container;
            }
            {
                this.$container = messageContainer;
                super(0);
            }
        });
        try {
            if (!this.pipelineRails.offer((Object)container)) {
                String errMsg = "The Orbit pipeline channel is full. >" + this.config.getPipelineBufferCount() + " buffered messages.";
                this.logger.error(errMsg);
                throw (Throwable)new CapacityExceededException(errMsg);
            }
        }
        catch (CapacityExceededException e) {
            throw (Throwable)e;
        }
        catch (Throwable t) {
            throw (Throwable)new Exception("Error offering to pipeline", t);
        }
    }

    public static /* synthetic */ void pushMessage$default(Pipeline pipeline, Message message, MessageMetadata messageMetadata, int n, Object object) {
        if ((n & 2) != 0) {
            messageMetadata = null;
        }
        pipeline.pushMessage(message, messageMetadata);
    }

    private final MessageMetadata getLocalMeta() {
        boolean bl = true;
        AuthInfo authInfo = new AuthInfo(true, this.localNodeInfo.getInfo().getId());
        MessageDirection messageDirection = MessageDirection.OUTBOUND;
        return new MessageMetadata(authInfo, messageDirection, bl);
    }

    private final Job launchRail(ReceiveChannel<MessageContainer> receiveChannel) {
        return BuildersKt.launch$default((CoroutineScope)this.runtimeScopes.getCpuScope(), null, null, (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super Unit>, Object>(this, receiveChannel, null){
            private CoroutineScope p$;
            Object L$0;
            Object L$1;
            Object L$2;
            int label;
            final /* synthetic */ Pipeline this$0;
            final /* synthetic */ ReceiveChannel $receiveChannel;

            /*
             * Unable to fully structure code
             */
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
                var5_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0: {
                        ResultKt.throwOnFailure((Object)$result);
                        $this$launch = this.p$;
                        var4_4 = this.$receiveChannel.iterator();
lbl7:
                        // 2 sources

                        while (true) {
                            this.L$0 = $this$launch;
                            this.L$1 = var4_4;
                            this.label = 1;
                            v0 = var4_4.hasNext((Continuation)this);
                            if (v0 == var5_2) {
                                return var5_2;
                            }
                            ** GOTO lbl20
                            break;
                        }
                    }
                    case 1: {
                        var4_4 = (ChannelIterator)this.L$1;
                        $this$launch = (CoroutineScope)this.L$0;
                        ResultKt.throwOnFailure((Object)$result);
                        v0 = $result;
lbl20:
                        // 2 sources

                        if (!((Boolean)v0).booleanValue()) break;
                        msg = (MessageContainer)var4_4.next();
                        Pipeline.access$getLogger$p(this.this$0).trace((Function0)new Function0<String>(msg){
                            final /* synthetic */ MessageContainer $msg;

                            @NotNull
                            public final String invoke() {
                                return "Pipeline rail received message: " + this.$msg;
                            }
                            {
                                this.$msg = messageContainer;
                                super(0);
                            }
                        });
                        this.L$0 = $this$launch;
                        this.L$1 = msg;
                        this.L$2 = var4_4;
                        this.label = 2;
                        v1 = this.this$0.onMessage(msg, (Continuation<? super Unit>)this);
                        if (v1 == var5_2) {
                            return var5_2;
                        }
                        ** GOTO lbl37
                    }
                    case 2: {
                        var4_4 = (ChannelIterator)this.L$2;
                        msg = (MessageContainer)this.L$1;
                        $this$launch = (CoroutineScope)this.L$0;
                        ResultKt.throwOnFailure((Object)$result);
                        v1 = $result;
lbl37:
                        // 2 sources

                        ** continue;
                    }
                }
                return Unit.INSTANCE;
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            {
                this.this$0 = pipeline;
                this.$receiveChannel = receiveChannel;
                super(2, continuation);
            }

            @NotNull
            public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> completion) {
                Intrinsics.checkParameterIsNotNull(completion, (String)"completion");
                Function2<CoroutineScope, Continuation<? super Unit>, Object> function2 = new /* invalid duplicate definition of identical inner class */;
                CoroutineScope coroutineScope = function2.p$ = (CoroutineScope)value;
                return function2;
            }

            public final Object invoke(Object object, Object object2) {
                return (this.create(object, (Continuation)object2)).invokeSuspend(Unit.INSTANCE);
            }
        }), (int)3, null);
    }

    /*
     * Unable to fully structure code
     */
    @Nullable
    final /* synthetic */ Object onMessage(@NotNull MessageContainer container, @NotNull Continuation<? super Unit> $completion) {
        if (!($completion instanceof onMessage.1)) ** GOTO lbl-1000
        var12_3 = $completion;
        if ((var12_3.label & -2147483648) != 0) {
            var12_3.label -= -2147483648;
        } else lbl-1000:
        // 2 sources

        {
            $continuation = new ContinuationImpl(this, $completion){
                /* synthetic */ Object result;
                int label;
                final /* synthetic */ Pipeline this$0;
                Object L$0;
                Object L$1;
                Object L$2;

                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                    this.result = $result;
                    this.label |= Integer.MIN_VALUE;
                    return this.this$0.onMessage(null, (Continuation<? super Unit>)this);
                }
                {
                    this.this$0 = pipeline;
                    super(continuation);
                }
            };
        }
        $result = $continuation.result;
        var13_5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch ($continuation.label) {
            case 0: {
                ResultKt.throwOnFailure((Object)$result);
                context = new PipelineContext(this.pipelineSteps.getSteps(), this, container.getMetadata());
                $continuation.L$0 = this;
                $continuation.L$1 = container;
                $continuation.L$2 = context;
                $continuation.label = 1;
                v0 = context.next(container.getMessage(), (Continuation<? super Unit>)$continuation);
                ** if (v0 != var13_5) goto lbl22
lbl21:
                // 1 sources

                return var13_5;
lbl22:
                // 1 sources

                ** GOTO lbl49
            }
            case 1: {
                context = (PipelineContext)$continuation.L$2;
                container = (MessageContainer)$continuation.L$1;
                this = (Pipeline)$continuation.L$0;
                try {
                    ResultKt.throwOnFailure((Object)$result);
                    v0 = $result;
                }
                catch (PipelineException t) {
                    this.logger.debug((Throwable)t, (Function0)onMessage.2.INSTANCE);
                    if (!container.getMetadata().getRespondOnError()) ** GOTO lbl46
                    v1 = t.getLastMsgState().getSource();
                    if (v1 == null) {
                        v1 = container.getMetadata().getAuthInfo().getNodeId();
                    }
                    src = v1;
                    var7_11 = (MessageContent)ExceptionKt.toErrorContent((Throwable)t.getReason());
                    var8_12 = null;
                    var9_13 = (MessageTarget)new MessageTarget.Unicast(src);
                    var10_14 = container.getMessage().getMessageId();
                    newMessage = new Message(var7_11, var10_14, var8_12, var9_13, 4, null);
                    newMeta = MessageMetadata.copy$default(this.getLocalMeta(), null, null, false, 3, null);
                    this.pushMessage(newMessage, newMeta);
                    ** GOTO lbl49
lbl46:
                    // 1 sources

                    throw t.getReason();
                }
                catch (CancellationException c) {
                    throw (Throwable)c;
                }
lbl49:
                // 3 sources

                return Unit.INSTANCE;
            }
        }
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    public Pipeline(@NotNull OrbitServerConfig config, @NotNull RuntimeScopes runtimeScopes2, @NotNull PipelineSteps pipelineSteps, @NotNull LocalNodeInfo localNodeInfo) {
        Intrinsics.checkParameterIsNotNull((Object)config, (String)"config");
        Intrinsics.checkParameterIsNotNull((Object)runtimeScopes2, (String)"runtimeScopes");
        Intrinsics.checkParameterIsNotNull((Object)pipelineSteps, (String)"pipelineSteps");
        Intrinsics.checkParameterIsNotNull((Object)localNodeInfo, (String)"localNodeInfo");
        this.config = config;
        this.runtimeScopes = runtimeScopes2;
        this.pipelineSteps = pipelineSteps;
        this.localNodeInfo = localNodeInfo;
        this.logger = KotlinLogging.INSTANCE.logger((Function0)logger.1.INSTANCE);
        this.pipelineRails = new RailWorker(this.runtimeScopes.getCpuScope(), this.config.getPipelineBufferCount(), this.config.getPipelineRailCount(), this.logger, false, (Function2)new Function2<MessageContainer, Continuation<? super Unit>, Object>(this){

            @Nullable
            public final Object invoke(@NotNull MessageContainer p1, @NotNull Continuation<? super Unit> continuation) {
                Pipeline pipeline = (Pipeline)this.receiver;
                InlineMarker.mark((int)0);
                Object object = pipeline.onMessage(p1, continuation);
                InlineMarker.mark((int)2);
                InlineMarker.mark((int)1);
                return object;
            }

            public final KDeclarationContainer getOwner() {
                return Reflection.getOrCreateKotlinClass(Pipeline.class);
            }

            public final String getName() {
                return "onMessage";
            }

            public final String getSignature() {
                return "onMessage(Lorbit/server/net/MessageContainer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;";
            }
        }, 16, null);
    }

    public static final /* synthetic */ KLogger access$getLogger$p(Pipeline $this) {
        return $this.logger;
    }
}

