package org.pragmaticminds.crunch.runtime.sort;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.pragmaticminds.crunch.api.records.MRecord;
import org.pragmaticminds.crunch.execution.TimestampSortFunction;

/* loaded from: input_file:org/pragmaticminds/crunch/runtime/sort/SortFunction.class */
public class SortFunction extends ProcessFunction<MRecord, MRecord> {
    private int capacity;
    private transient ValueState<TimestampSortFunction<MRecord>> valueState;

    public SortFunction() {
        this.capacity = 100;
    }

    public SortFunction(int i) {
        this.capacity = 100;
        this.capacity = i;
    }

    public void open(Configuration configuration) {
        this.valueState = getRuntimeContext().getState(new ValueStateDescriptor("sorted-events", TypeInformation.of(new TypeHint<TimestampSortFunction<MRecord>>() { // from class: org.pragmaticminds.crunch.runtime.sort.SortFunction.1
        })));
    }

    public void processElement(MRecord mRecord, ProcessFunction<MRecord, MRecord>.Context context, Collector<MRecord> collector) throws IOException {
        TimerService timerService = context.timerService();
        TimestampSortFunction timestampSortFunction = (TimestampSortFunction) this.valueState.value();
        if (timestampSortFunction == null) {
            timestampSortFunction = new TimestampSortFunction(this.capacity);
        }
        timestampSortFunction.process(Long.valueOf(mRecord.getTimestamp()), Long.valueOf(timerService.currentWatermark()), mRecord);
        timerService.registerEventTimeTimer(mRecord.getTimestamp());
        this.valueState.update(timestampSortFunction);
    }

    public void onTimer(long j, ProcessFunction<MRecord, MRecord>.OnTimerContext onTimerContext, Collector<MRecord> collector) throws IOException {
        Long valueOf = Long.valueOf(onTimerContext.timerService().currentWatermark());
        TimestampSortFunction timestampSortFunction = (TimestampSortFunction) this.valueState.value();
        if (timestampSortFunction == null) {
            timestampSortFunction = new TimestampSortFunction(this.capacity);
        }
        Iterator it = timestampSortFunction.onTimer(valueOf).iterator();
        while (it.hasNext()) {
            collector.collect((MRecord) it.next());
        }
        this.valueState.update(timestampSortFunction);
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((MRecord) obj, (ProcessFunction<MRecord, MRecord>.Context) context, (Collector<MRecord>) collector);
    }
}
