/*
 * Decompiled with CFR 0.152.
 */
package software.tnb.kafka.validation;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.tnb.common.validation.Validation;

public class KafkaValidation<T>
implements Validation {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaValidation.class);
    private final KafkaProducer<String, T> producer;
    private final KafkaConsumer<String, T> consumer;

    public KafkaValidation(KafkaProducer<String, T> producer, KafkaConsumer<String, T> consumer) {
        this.producer = producer;
        this.consumer = consumer;
    }

    public void closeProducer() {
        this.producer.close();
    }

    public void closeConsumer() {
        this.consumer.close();
    }

    public void produce(String topic, T message) {
        this.produce(topic, message, Collections.emptyList());
    }

    public void produce(String topic, T message, List<Header> headers) {
        StringBuilder log = new StringBuilder("Producing message \"").append(message).append("\"");
        if (headers != null && !headers.isEmpty()) {
            log.append(" with headers: ");
            log.append(headers.stream().map(h -> h.key() + "=" + new String(h.value())).collect(Collectors.joining(", ")));
        }
        log.append(" to topic \"").append(topic).append("\"");
        LOG.debug(log.toString());
        this.producer.send(new ProducerRecord(topic, null, null, message, headers));
    }

    public void produce(String topic, T message, Map<String, String> headers) {
        this.produce(topic, message, headers.entrySet().stream().map(e -> new RecordHeader((String)e.getKey(), ((String)e.getValue()).getBytes())).collect(Collectors.toList()));
    }

    public List<ConsumerRecord<String, T>> consume(String topic) {
        this.consumer.subscribe(Collections.singletonList(topic));
        this.consumer.seekToBeginning((Collection)this.consumer.assignment());
        return StreamSupport.stream(this.consumer.poll(Duration.ofSeconds(30L)).records(topic).spliterator(), false).collect(Collectors.toList());
    }
}

