/*
 * Decompiled with CFR 0.152.
 */
package io.elastic.sailor.impl;

import com.google.inject.Inject;
import io.elastic.api.EventEmitter;
import io.elastic.api.ExecutionParameters;
import io.elastic.api.Function;
import io.elastic.api.Message;
import io.elastic.sailor.CountingCallback;
import io.elastic.sailor.EmitterCallbackFactory;
import io.elastic.sailor.ExecutionContext;
import io.elastic.sailor.ExecutionStats;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.Step;
import javax.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageProcessorImpl
implements MessageProcessor {
    private static final Logger logger = LoggerFactory.getLogger(MessageProcessorImpl.class);
    private final EmitterCallbackFactory emitterCallbackFactory;

    @Inject
    public MessageProcessorImpl(EmitterCallbackFactory emitterCallbackFactory) {
        this.emitterCallbackFactory = emitterCallbackFactory;
    }

    @Override
    public ExecutionStats processMessage(ExecutionContext executionContext, Function function) {
        Message incomingMessage = executionContext.getMessage();
        Step step = executionContext.getStep();
        JsonObject cfg = step.getCfg();
        JsonObject snapshot = step.getSnapshot();
        CountingCallback dataCallback = this.emitterCallbackFactory.createDataCallback(executionContext);
        CountingCallback errorCallback = this.emitterCallbackFactory.createErrorCallback(executionContext);
        CountingCallback reboundCallback = this.emitterCallbackFactory.createReboundCallback(executionContext);
        CountingCallback snapshotCallback = this.emitterCallbackFactory.createSnapshotCallback(executionContext);
        EventEmitter.Callback updateKeysCallback = this.emitterCallbackFactory.createUpdateKeysCallback(executionContext);
        EventEmitter.Callback httpReplyCallback = this.emitterCallbackFactory.createHttpReplyCallback(executionContext);
        EventEmitter eventEmitter = new EventEmitter.Builder().onData(dataCallback).onError(errorCallback).onRebound(reboundCallback).onSnapshot(snapshotCallback).onUpdateKeys(updateKeysCallback).onHttpReplyCallback(httpReplyCallback).build();
        ExecutionParameters params = new ExecutionParameters.Builder(incomingMessage, eventEmitter).configuration(cfg).snapshot(snapshot).build();
        try {
            function.execute(params);
        }
        catch (RuntimeException e) {
            logger.error("Component execution failed", (Throwable)e);
            eventEmitter.emitException(e);
        }
        return new ExecutionStats(dataCallback.getCount(), errorCallback.getCount(), reboundCallback.getCount());
    }
}

