package org.apache.accumulo.tracer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.tracer.thrift.Annotation;
import org.apache.accumulo.tracer.thrift.RemoteSpan;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.TimelineAnnotation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/tracer/AsyncSpanReceiver.class */
public abstract class AsyncSpanReceiver<SpanKey, Destination> implements SpanReceiver {
    private static final Logger log = LoggerFactory.getLogger(AsyncSpanReceiver.class);
    public static final String SEND_TIMER_MILLIS = "tracer.send.timer.millis";
    public static final String QUEUE_SIZE = "tracer.queue.size";
    public static final String SPAN_MIN_MS = "tracer.span.min.ms";
    private final Map<SpanKey, Destination> clients;
    protected String host;
    protected String service;
    Timer timer;
    protected final AbstractQueue<RemoteSpan> sendQueue;
    protected final AtomicInteger sendQueueSize;
    int maxQueueSize;
    long lastNotificationOfDroppedSpans;
    int minSpanSize;

    protected abstract Destination createDestination(SpanKey spankey) throws Exception;

    protected abstract void send(Destination destination, RemoteSpan remoteSpan) throws Exception;

    protected abstract SpanKey getSpanKey(Map<String, String> map);

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncSpanReceiver() {
        this.clients = new HashMap();
        this.host = null;
        this.service = null;
        this.timer = new Timer("SpanSender", true);
        this.sendQueue = new ConcurrentLinkedQueue();
        this.sendQueueSize = new AtomicInteger(0);
        this.maxQueueSize = 5000;
        this.lastNotificationOfDroppedSpans = 0L;
        this.minSpanSize = 1;
    }

    public AsyncSpanReceiver(HTraceConfiguration hTraceConfiguration) {
        this.clients = new HashMap();
        this.host = null;
        this.service = null;
        this.timer = new Timer("SpanSender", true);
        this.sendQueue = new ConcurrentLinkedQueue();
        this.sendQueueSize = new AtomicInteger(0);
        this.maxQueueSize = 5000;
        this.lastNotificationOfDroppedSpans = 0L;
        this.minSpanSize = 1;
        this.host = hTraceConfiguration.get("trace.host", this.host);
        if (this.host == null) {
            try {
                this.host = InetAddress.getLocalHost().getCanonicalHostName().toString();
            } catch (UnknownHostException e) {
                this.host = "unknown";
            }
        }
        this.service = hTraceConfiguration.get("trace.service", this.service);
        this.maxQueueSize = hTraceConfiguration.getInt(QUEUE_SIZE, this.maxQueueSize);
        this.minSpanSize = hTraceConfiguration.getInt(SPAN_MIN_MS, this.minSpanSize);
        int i = hTraceConfiguration.getInt(SEND_TIMER_MILLIS, 1000);
        this.timer.schedule(new TimerTask() { // from class: org.apache.accumulo.tracer.AsyncSpanReceiver.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    AsyncSpanReceiver.this.sendSpans();
                } catch (Exception e2) {
                    AsyncSpanReceiver.log.warn("Exception sending spans to destination", e2);
                }
            }
        }, i, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendSpans() {
        while (!this.sendQueue.isEmpty()) {
            boolean z = false;
            RemoteSpan peek = this.sendQueue.peek();
            SpanKey spanKey = getSpanKey(peek.data);
            Destination destination = this.clients.get(spanKey);
            if (destination == null) {
                try {
                    this.clients.put(spanKey, createDestination(spanKey));
                } catch (Exception e) {
                    log.warn("Exception creating connection to span receiver", e);
                }
            }
            if (destination != null) {
                try {
                    send(destination, peek);
                    synchronized (this.sendQueue) {
                        this.sendQueue.remove();
                        this.sendQueue.notifyAll();
                        this.sendQueueSize.decrementAndGet();
                    }
                    z = true;
                } catch (Exception e2) {
                    log.warn("Got error sending to " + spanKey + ", refreshing client", e2);
                    this.clients.remove(spanKey);
                }
            }
            if (!z) {
                return;
            }
        }
    }

    public static Map<String, String> convertToStrings(Map<byte[], byte[]> map) {
        if (map == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
            hashMap.put(new String(entry.getKey(), StandardCharsets.UTF_8), new String(entry.getValue(), StandardCharsets.UTF_8));
        }
        return hashMap;
    }

    public static List<Annotation> convertToAnnotations(List<TimelineAnnotation> list) {
        if (list == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (TimelineAnnotation timelineAnnotation : list) {
            arrayList.add(new Annotation(timelineAnnotation.getTime(), timelineAnnotation.getMessage()));
        }
        return arrayList;
    }

    public void receiveSpan(Span span) {
        if (span.getStopTimeMillis() - span.getStartTimeMillis() < this.minSpanSize) {
            return;
        }
        Map<String, String> convertToStrings = convertToStrings(span.getKVAnnotations());
        if (getSpanKey(convertToStrings) != null) {
            List<Annotation> convertToAnnotations = convertToAnnotations(span.getTimelineAnnotations());
            if (this.sendQueueSize.get() <= this.maxQueueSize) {
                this.sendQueue.add(new RemoteSpan(this.host, this.service == null ? span.getProcessId() : this.service, span.getTraceId(), span.getSpanId(), span.getParentId(), span.getStartTimeMillis(), span.getStopTimeMillis(), span.getDescription(), convertToStrings, convertToAnnotations));
                this.sendQueueSize.incrementAndGet();
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastNotificationOfDroppedSpans > 60000) {
                log.warn("Tracing spans are being dropped because there are already {} spans queued for delivery.\nThis does not affect performance, security or data integrity, but distributed tracing information is being lost.", Integer.valueOf(this.maxQueueSize));
                this.lastNotificationOfDroppedSpans = currentTimeMillis;
            }
        }
    }

    public void close() {
        synchronized (this.sendQueue) {
            while (!this.sendQueue.isEmpty()) {
                try {
                    this.sendQueue.wait();
                } catch (InterruptedException e) {
                    log.warn("flush interrupted");
                }
            }
        }
    }
}
