package org.pragmaticminds.crunch.runtime.eval;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.pragmaticminds.crunch.api.EvalFunction;
import org.pragmaticminds.crunch.api.EvalFunctionCall;
import org.pragmaticminds.crunch.api.events.GenericEventHandler;
import org.pragmaticminds.crunch.api.records.DataType;
import org.pragmaticminds.crunch.api.records.MRecord;
import org.pragmaticminds.crunch.api.values.dates.Value;
import org.pragmaticminds.crunch.events.GenericEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/pragmaticminds/crunch/runtime/eval/EvalFunctionWrapper.class */
public class EvalFunctionWrapper extends ProcessFunction<MRecord, GenericEvent> implements GenericEventHandler, Serializable {
    private static final Logger logger = LoggerFactory.getLogger(EvalFunctionWrapper.class);
    private final HashMap<String, DataType> channelsAndTypes;
    private EvalFunctionCall call;
    private LinkedBlockingQueue<GenericEvent> eventBuffer = new LinkedBlockingQueue<>();
    private transient ValueState<EvalFunction> valueState;

    public EvalFunctionWrapper(EvalFunctionCall evalFunctionCall) {
        this.call = evalFunctionCall;
        this.channelsAndTypes = new HashMap<>(evalFunctionCall.getChannelsAndTypes());
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.valueState = getRuntimeContext().getState(new ValueStateDescriptor("evalFunction-state", TypeInformation.of(EvalFunction.class)));
    }

    public void processElement(MRecord mRecord, ProcessFunction<MRecord, GenericEvent>.Context context, Collector<GenericEvent> collector) throws Exception {
        Map<String, Value> createTypedChannelMap = createTypedChannelMap(mRecord);
        if (createTypedChannelMap.isEmpty()) {
            return;
        }
        Map map = (Map) this.call.getChannelNamesMapping().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (Value) createTypedChannelMap.get(entry.getValue());
        }));
        if (this.valueState.value() == null) {
            EvalFunction evalFunction = this.call.getEvalFunction();
            evalFunction.setup(this.call.getLiterals(), this);
            this.valueState.update(evalFunction);
        }
        EvalFunction evalFunction2 = (EvalFunction) this.valueState.value();
        evalFunction2.setEventHandler(this);
        try {
            evalFunction2.eval(mRecord.getTimestamp(), map);
        } catch (Exception e) {
            logger.warn("Problem during Evaluation of evalfunction " + evalFunction2 + " with map " + map, e);
        }
        this.valueState.update(evalFunction2);
        while (!this.eventBuffer.isEmpty()) {
            GenericEvent poll = this.eventBuffer.poll();
            poll.setEventSource(mRecord.getSource());
            collector.collect(poll);
        }
    }

    Map<String, Value> createTypedChannelMap(MRecord mRecord) {
        List list = (List) this.channelsAndTypes.entrySet().stream().filter(entry -> {
            return !mRecord.getChannels().contains(entry.getKey());
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return (Map) this.channelsAndTypes.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry2 -> {
                return mRecord.getValue((String) entry2.getKey());
            }));
        }
        logger.trace("No key matching SPS parameter found for evaluation {}, the missing channels are {}", this.call.getEvalFunction().getFunctionDef().getSignature().getName(), list);
        return Collections.emptyMap();
    }

    public void fire(GenericEvent genericEvent) {
        this.eventBuffer.add(genericEvent);
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((MRecord) obj, (ProcessFunction<MRecord, GenericEvent>.Context) context, (Collector<GenericEvent>) collector);
    }
}
