/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.MetricsReporterList;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetryBuilder;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaConsumerTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaConsumerTelemetrySupplier;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetrySupplier;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryConsumerInterceptor;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryProducerInterceptor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public final class KafkaTelemetry {
    private final OpenTelemetry openTelemetry;
    private final KafkaProducerTelemetry producerTelemetry;
    private final KafkaConsumerTelemetry consumerTelemetry;

    KafkaTelemetry(OpenTelemetry openTelemetry, Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter, Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter, Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter, boolean producerPropagationEnabled) {
        this.openTelemetry = openTelemetry;
        this.producerTelemetry = new KafkaProducerTelemetry(openTelemetry.getPropagators().getTextMapPropagator(), producerInstrumenter, producerPropagationEnabled);
        this.consumerTelemetry = new KafkaConsumerTelemetry(consumerReceiveInstrumenter, consumerProcessInstrumenter);
    }

    public static KafkaTelemetry create(OpenTelemetry openTelemetry) {
        return KafkaTelemetry.builder(openTelemetry).build();
    }

    public static KafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) {
        return new KafkaTelemetryBuilder(openTelemetry);
    }

    KafkaProducerTelemetry getProducerTelemetry() {
        return this.producerTelemetry;
    }

    KafkaConsumerTelemetry getConsumerTelemetry() {
        return this.consumerTelemetry;
    }

    public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
        return (Producer)Proxy.newProxyInstance(KafkaTelemetry.class.getClassLoader(), new Class[]{Producer.class}, (proxy, method, args) -> {
            if ("send".equals(method.getName()) && method.getParameterCount() >= 1 && method.getParameterTypes()[0] == ProducerRecord.class) {
                ProducerRecord record = (ProducerRecord)args[0];
                Callback callback = method.getParameterCount() >= 2 && method.getParameterTypes()[1] == Callback.class ? (Callback)args[1] : null;
                return this.producerTelemetry.buildAndInjectSpan(record, producer, callback, (arg_0, arg_1) -> ((Producer)producer).send(arg_0, arg_1));
            }
            try {
                return method.invoke((Object)producer, args);
            }
            catch (InvocationTargetException exception) {
                throw exception.getCause();
            }
        });
    }

    public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
        return (Consumer)Proxy.newProxyInstance(KafkaTelemetry.class.getClassLoader(), new Class[]{Consumer.class}, (proxy, method, args) -> {
            ConsumerRecords result;
            Timer timer = "poll".equals(method.getName()) ? Timer.start() : null;
            try {
                result = method.invoke((Object)consumer, args);
            }
            catch (InvocationTargetException exception) {
                throw exception.getCause();
            }
            if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
                ConsumerRecords consumerRecords = result;
                Context receiveContext = this.consumerTelemetry.buildAndFinishSpan(consumerRecords, consumer, timer);
                if (receiveContext == null) {
                    receiveContext = Context.current();
                }
                KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.create((Context)receiveContext, (Consumer)consumer);
                result = this.consumerTelemetry.addTracing(consumerRecords, consumerContext);
            }
            return result;
        });
    }

    public Map<String, ?> metricConfigProperties() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("metric.reporters", MetricsReporterList.singletonList(OpenTelemetryMetricsReporter.class));
        config.put("opentelemetry.supplier", new OpenTelemetrySupplier(this.openTelemetry));
        config.put("opentelemetry.instrumentation_name", "io.opentelemetry.kafka-clients-2.6");
        return Collections.unmodifiableMap(config);
    }

    public Map<String, ?> producerInterceptorConfigProperties() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("interceptor.classes", OpenTelemetryProducerInterceptor.class.getName());
        config.put("opentelemetry.kafka-producer-telemetry.supplier", new KafkaProducerTelemetrySupplier(this.producerTelemetry));
        return Collections.unmodifiableMap(config);
    }

    public Map<String, ?> consumerInterceptorConfigProperties() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("interceptor.classes", OpenTelemetryConsumerInterceptor.class.getName());
        config.put("opentelemetry.kafka-consumer-telemetry.supplier", new KafkaConsumerTelemetrySupplier(this.consumerTelemetry));
        return Collections.unmodifiableMap(config);
    }
}

