/*
 * Decompiled with CFR 0.152.
 */
package pw.avvero.test.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pw.avvero.test.kafka.RecordCaptorAccess;
import pw.avvero.test.kafka.RecordSnapshot;

public class RecordCaptor
implements RecordCaptorAccess {
    private static final Logger log = LoggerFactory.getLogger(RecordCaptor.class);
    private final Map<String, Map<Object, List<RecordSnapshot>>> topicKeyRecords = new ConcurrentHashMap<String, Map<Object, List<RecordSnapshot>>>();

    public void capture(RecordSnapshot recordSnapshot) {
        log.debug("[KTS] Record captured for topic {} for key {}\n    Headers: {}\n    Value: {}", new Object[]{recordSnapshot.getTopic(), recordSnapshot.getKey(), recordSnapshot.getHeaders(), recordSnapshot.getValue()});
        this.topicKeyRecords.computeIfAbsent(recordSnapshot.getTopic(), k -> new ConcurrentHashMap()).computeIfAbsent(recordSnapshot.getKey(), k -> new CopyOnWriteArrayList()).add(recordSnapshot);
    }

    @Override
    public List<RecordSnapshot> getRecords(String topic, Object key) {
        return this.topicKeyRecords.getOrDefault(topic, Collections.emptyMap()).getOrDefault(key, Collections.emptyList());
    }

    public List<RecordSnapshot> getRecords(String topic) {
        return this.topicKeyRecords.getOrDefault(topic, Collections.emptyMap()).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
    }

    public RecordCaptorAccess awaitAtMost(int numberOrRecords, long millis) {
        RecordCaptor recordCaptor = this;
        return (topic, key) -> {
            Supplier<List> supplier = () -> recordCaptor.getRecords(topic, key);
            Awaitility.await().atMost(millis, TimeUnit.MILLISECONDS).pollInterval(50L, TimeUnit.MILLISECONDS).until(() -> ((List)supplier.get()).size() != numberOrRecords);
            return supplier.get();
        };
    }
}

