/*
 * Decompiled with CFR 0.152.
 */
package pw.avvero.test.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import pw.avvero.test.kafka.ContainerTestUtils;

public class KafkaSupport {
    private static final Logger log = LoggerFactory.getLogger(KafkaSupport.class);
    public static int OFFSET_COMMIT_WAIT_ATTEMPTS_MAX = 200;
    public static int OFFSET_COMMIT_WAIT_TIME = 50;

    public static void waitForPartitionAssignment(ApplicationContext applicationContext) throws Exception {
        KafkaSupport.detectMultipleContainersForSameTopicWithinSameGroup(applicationContext);
        KafkaListenerEndpointRegistry registry = (KafkaListenerEndpointRegistry)applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        log.debug("[KTS] Waiting for partition assignment is requested");
        for (MessageListenerContainer messageListenerContainer : registry.getListenerContainers()) {
            long startTime = System.currentTimeMillis();
            log.debug("[KTS] Waiting for partition assignment started for {}", (Object)messageListenerContainer.getListenerId());
            int partitions = ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
            long gauge = System.currentTimeMillis() - startTime;
            if (partitions > 0) {
                String topics = Objects.requireNonNull(messageListenerContainer.getAssignedPartitions()).stream().map(TopicPartition::topic).collect(Collectors.joining(", "));
                log.debug("[KTS] Waiting for partition assignment for {} is succeeded in {} ms, topics: {}", new Object[]{messageListenerContainer.getListenerId(), gauge, topics});
                continue;
            }
            log.error("[KTS] Waiting for partition assignment for {} is failed in {} ms", (Object)messageListenerContainer.getListenerId(), (Object)gauge);
        }
        log.debug("[KTS] At least one partition is assigned for every container");
    }

    private static void detectMultipleContainersForSameTopicWithinSameGroup(ApplicationContext applicationContext) {
        KafkaListenerEndpointRegistry registry = (KafkaListenerEndpointRegistry)applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        HashMap<String, List> containersPerTopicInSameGroup = new HashMap<String, List>();
        for (MessageListenerContainer container : registry.getListenerContainers()) {
            ContainerProperties containerProperties = container.getContainerProperties();
            if (containerProperties.getTopics() == null) continue;
            for (String topic : containerProperties.getTopics()) {
                containersPerTopicInSameGroup.computeIfAbsent(containerProperties.getGroupId() + " : " + topic, k -> new ArrayList()).add(container);
            }
        }
        containersPerTopicInSameGroup.forEach((key, list) -> {
            if (list.size() > 1) {
                String[] parts = key.split(" : ");
                String groupId = parts[0];
                String topic = parts[1];
                String containerNames = list.stream().map(MessageListenerContainer::getListenerId).collect(Collectors.joining(", "));
                throw new RuntimeException(String.format("Detected multiple Kafka listener containers (%s) configured to listen to topic '%s' within the same group '%s'. This configuration may lead to unexpected behavior or message duplication. Please ensure each topic is consumed by a unique group or container.", containerNames, topic, groupId));
            }
        });
    }

    public static void waitForPartitionOffsetCommit(ApplicationContext applicationContext) throws ExecutionException, InterruptedException {
        List bootstrapServers = ((KafkaConnectionDetails)applicationContext.getBean(KafkaConnectionDetails.class)).getBootstrapServers();
        KafkaSupport.waitForPartitionOffsetCommit(bootstrapServers);
    }

    public static void waitForPartitionOffsetCommit(List<String> bootstrapServers) throws InterruptedException, ExecutionException {
        log.debug("[KTS] Waiting for offset commit is requested");
        long startTime = System.currentTimeMillis();
        try (AdminClient adminClient = AdminClient.create(Collections.singletonMap("bootstrap.servers", bootstrapServers));){
            Set<String> consumerGroups = ((Collection)adminClient.listConsumerGroups().all().get()).stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
            Set<String> topics = ((Map)adminClient.listTopics().namesToListings().get()).keySet();
            Map<TopicPartition, Long> topicsOffsets = KafkaSupport.getOffsetsForTopics(adminClient, topics);
            LinkedList<TopicPartition> topicQueue = new LinkedList<TopicPartition>(topicsOffsets.keySet());
            int attempt = 0;
            while (!topicQueue.isEmpty()) {
                TopicPartition tp = (TopicPartition)topicQueue.remove();
                long topicOffset = topicsOffsets.get(tp);
                if (++attempt > OFFSET_COMMIT_WAIT_ATTEMPTS_MAX) {
                    throw new RuntimeException("Exceeded maximum attempts (" + OFFSET_COMMIT_WAIT_ATTEMPTS_MAX + ") waiting for offset commit for partition " + tp + ".");
                }
                Map<String, Long> consumerGroupsOffsets = KafkaSupport.getOffsetsForConsumerGroups(adminClient, consumerGroups, tp);
                for (String consumerGroup : consumerGroups) {
                    Long consumerGroupOffset = consumerGroupsOffsets.get(consumerGroup);
                    if (consumerGroupOffset == null) {
                        log.trace("[KTS] Waiting for offset commit for topic {} in group {}: topic is not under capture", (Object)tp.topic(), (Object)consumerGroup);
                    } else {
                        log.trace("[KTS] Waiting for offset commit for topic {} in group {}: [topic offset: {} != group offset: {}]", new Object[]{tp.topic(), consumerGroup, consumerGroupOffset, topicOffset});
                    }
                    if (consumerGroupOffset == null || consumerGroupOffset == topicOffset) continue;
                    try {
                        Thread.sleep(OFFSET_COMMIT_WAIT_TIME);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    log.warn("[KTS] Consumer group {} offset for topic '{}' is {}, which is not equal to the topic offset {}. Waiting for further message processing before proceeding. Refreshing end offsets and reevaluating.", new Object[]{consumerGroup, tp.topic(), consumerGroupOffset, topicOffset});
                    topicsOffsets = KafkaSupport.getOffsetsForTopics(adminClient, topics);
                    List sortedTopicPartitions = topicsOffsets.keySet().stream().sorted((a, b) -> a.topic().equals(tp.topic()) ? -1 : (b.topic().equals(tp.topic()) ? 1 : 0)).toList();
                    topicQueue.clear();
                    topicQueue.addAll(sortedTopicPartitions);
                }
            }
        }
        log.debug("[KTS] Waiting for offset commit is finished in {} ms", (Object)(System.currentTimeMillis() - startTime));
    }

    public static Map<TopicPartition, Long> getOffsetsForTopics(AdminClient adminClient, Set<String> topics) throws ExecutionException, InterruptedException {
        HashMap<TopicPartition, OffsetSpec> topicPartitions = new HashMap<TopicPartition, OffsetSpec>();
        for (String topic : topics) {
            DescribeTopicsResult topicInfo = adminClient.describeTopics(Collections.singletonList(topic));
            int partitions = ((TopicDescription)((KafkaFuture)topicInfo.topicNameValues().get(topic)).get()).partitions().size();
            for (int i = 0; i < partitions; ++i) {
                topicPartitions.put(new TopicPartition(topic, i), OffsetSpec.latest());
            }
        }
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        ((Map)adminClient.listOffsets(topicPartitions).all().get()).forEach((tp, info) -> endOffsets.put((TopicPartition)tp, info.offset()));
        return endOffsets;
    }

    public static Map<String, Long> getOffsetsForConsumerGroups(AdminClient adminClient, Set<String> consumerGroups, TopicPartition topicPartition) throws ExecutionException, InterruptedException {
        HashMap<String, ListConsumerGroupOffsetsSpec> groupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>();
        for (String consumerGroup : consumerGroups) {
            ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec();
            spec.topicPartitions(List.of(topicPartition));
            groupSpecs.put(consumerGroup, spec);
        }
        ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(groupSpecs);
        HashMap<String, Long> currentOffsets = new HashMap<String, Long>();
        for (String consumerGroup : consumerGroups) {
            ((Map)offsetsResult.partitionsToOffsetAndMetadata(consumerGroup).get()).forEach((tp, oam) -> {
                if (!topicPartition.equals(tp)) {
                    throw new RuntimeException("Wrong topic partition in response");
                }
                currentOffsets.put(consumerGroup, oam != null ? Long.valueOf(oam.offset()) : null);
            });
        }
        return currentOffsets;
    }
}

