/*
 * Decompiled with CFR 0.152.
 */
package smartthings.brave.kafka.consumers;

import brave.Span;
import brave.Tracing;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import smartthings.brave.kafka.consumers.DefaultSpanNameProvider;
import smartthings.brave.kafka.consumers.ExtractException;
import smartthings.brave.kafka.consumers.SpanNameProvider;
import smartthings.brave.kafka.consumers.TracedConsumerRecord;
import zipkin.Endpoint;

public abstract class BaseTracingConsumerInterceptor<K>
implements ConsumerInterceptor<K, byte[]> {
    private Tracing tracing;
    private SpanNameProvider<K> nameProvider;
    private Endpoint kafkaEndpoint;
    private static final Logger logger = Logger.getLogger(BaseTracingConsumerInterceptor.class.getName());

    protected abstract TracedConsumerRecord<K, byte[]> getTracedConsumerRecord(ConsumerRecord<K, byte[]> var1) throws ExtractException;

    public ConsumerRecords<K, byte[]> onConsume(ConsumerRecords<K, byte[]> records) {
        RecordsAccumulator<K> tracedRecords = new RecordsAccumulator<K>(records.count());
        for (ConsumerRecord record : records) {
            try {
                TracedConsumerRecord<K, byte[]> tracedConsumerRecord = this.getTracedConsumerRecord(record);
                TraceContextOrSamplingFlags traceContextOrSamplingFlags = tracedConsumerRecord.traceContextOrSamplingFlags;
                TraceContext ctx = traceContextOrSamplingFlags.context();
                Span span = ctx != null ? this.tracing.tracer().joinSpan(ctx) : this.tracing.tracer().newTrace(traceContextOrSamplingFlags.samplingFlags());
                span.kind(Span.Kind.SERVER).name(this.nameProvider.spanName(record)).remoteEndpoint(this.kafkaEndpoint).tag("kafka.partition", String.valueOf(record.partition())).start().flush();
                tracedRecords.addRecord(tracedConsumerRecord);
            }
            catch (ExtractException e) {
                logger.log(Level.WARNING, "extract exception, returning unmodified record - " + record, e);
                tracedRecords.addRecord(record);
            }
        }
        return new ConsumerRecords(tracedRecords.getRecords());
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    public void close() {
    }

    public void configure(Map<String, ?> configs) {
        if (configs.get("brave.tracing") == null || !(configs.get("brave.tracing") instanceof Tracing)) {
            throw new ConfigException("brave.tracing", configs.get("brave.tracing"), "Must an be instance of brave.Tracing");
        }
        this.tracing = (Tracing)configs.get("brave.tracing");
        this.nameProvider = configs.get("brave.span.name.provider") != null && configs.get("brave.span.name.provider") instanceof SpanNameProvider ? (SpanNameProvider)configs.get("brave.span.name.provider") : new DefaultSpanNameProvider();
        this.kafkaEndpoint = configs.get("brave.span.remote.endpoint") != null && configs.get("brave.span.remote.endpoint") instanceof Endpoint ? (Endpoint)configs.get("brave.span.remote.endpoint") : Endpoint.builder().serviceName("Kafka").build();
    }

    private static class RecordsAccumulator<K> {
        private final Map<TopicPartition, List<ConsumerRecord<K, byte[]>>> records;

        RecordsAccumulator(int size) {
            this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, byte[]>>>(size);
        }

        void addRecord(ConsumerRecord<K, byte[]> record) {
            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
            if (this.records.containsKey(tp)) {
                this.records.get(tp).add(record);
            } else {
                LinkedList<ConsumerRecord<K, byte[]>> tracedList = new LinkedList<ConsumerRecord<K, byte[]>>();
                tracedList.add(record);
                this.records.put(tp, tracedList);
            }
        }

        Map<TopicPartition, List<ConsumerRecord<K, byte[]>>> getRecords() {
            return ImmutableMap.copyOf(this.records);
        }
    }
}

