/*
 * Decompiled with CFR 0.152.
 */
package pw.avvero.test.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import pw.avvero.test.kafka.ContainerTestUtils;
import pw.avvero.test.kafka.OffsetSnapshotFrame;

public class KafkaSupport {
    private static final Logger log = LoggerFactory.getLogger(KafkaSupport.class);
    public static int OFFSET_COMMIT_WAIT_ATTEMPTS_MAX = 200;
    public static int OFFSET_COMMIT_WAIT_TIME = 10;
    static ThreadLocal<Long> topicsOffsetsTotalInThread = new ThreadLocal();

    public static void waitForPartitionAssignment(ApplicationContext applicationContext) throws Exception {
        KafkaSupport.detectMultipleContainersForSameTopicWithinSameGroup(applicationContext);
        KafkaListenerEndpointRegistry registry = (KafkaListenerEndpointRegistry)applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        log.debug("[KTS] Waiting for partition assignment is requested");
        long startTime = System.currentTimeMillis();
        for (MessageListenerContainer messageListenerContainer : registry.getListenerContainers()) {
            long partStartTime = System.currentTimeMillis();
            log.debug("[KTS] Waiting for partition assignment started for {}", (Object)messageListenerContainer.getListenerId());
            int partitions = ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
            long partGauge = System.currentTimeMillis() - partStartTime;
            if (partitions > 0) {
                String topics = Objects.requireNonNull(messageListenerContainer.getAssignedPartitions()).stream().map(TopicPartition::topic).collect(Collectors.joining(", "));
                log.debug("[KTS] Waiting for partition assignment for {} is succeeded in {} ms, topics: {}", new Object[]{messageListenerContainer.getListenerId(), partGauge, topics});
                continue;
            }
            log.error("[KTS] Waiting for partition assignment for {} is failed in {} ms", (Object)messageListenerContainer.getListenerId(), (Object)partGauge);
        }
        long gauge = System.currentTimeMillis() - startTime;
        log.debug("[KTS] Waiting for partition assignment is finished in {} ms. At least one partition is assigned for every container", (Object)gauge);
    }

    private static void detectMultipleContainersForSameTopicWithinSameGroup(ApplicationContext applicationContext) {
        KafkaListenerEndpointRegistry registry = (KafkaListenerEndpointRegistry)applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        HashMap<String, List> containersPerTopicInSameGroup = new HashMap<String, List>();
        for (MessageListenerContainer container : registry.getListenerContainers()) {
            ContainerProperties containerProperties = container.getContainerProperties();
            if (containerProperties.getTopics() == null) continue;
            for (String topic : containerProperties.getTopics()) {
                containersPerTopicInSameGroup.computeIfAbsent(containerProperties.getGroupId() + " : " + topic, k -> new ArrayList()).add(container);
            }
        }
        containersPerTopicInSameGroup.forEach((key, list) -> {
            if (list.size() > 1) {
                String[] parts = key.split(" : ");
                String groupId = parts[0];
                String topic = parts[1];
                String containerNames = list.stream().map(MessageListenerContainer::getListenerId).collect(Collectors.joining(", "));
                throw new RuntimeException(String.format("Detected multiple Kafka listener containers (%s) configured to listen to topic '%s' within the same group '%s'. This configuration may lead to unexpected behavior or message duplication. Please ensure each topic is consumed by a unique group or container.", containerNames, topic, groupId));
            }
        });
    }

    public static void waitForPartitionOffsetCommit(ApplicationContext applicationContext) throws ExecutionException, InterruptedException {
        List bootstrapServers = ((KafkaConnectionDetails)applicationContext.getBean(KafkaConnectionDetails.class)).getBootstrapServers();
        KafkaSupport.waitForPartitionOffsetCommit(bootstrapServers);
    }

    public static void waitForPartitionOffsetCommit(List<String> bootstrapServers) {
        try (AdminClient adminClient = AdminClient.create(Collections.singletonMap("bootstrap.servers", bootstrapServers));){
            try {
                KafkaSupport.waitForPartitionOffsetCommit(adminClient);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void waitForPartitionOffsetCommit(AdminClient adminClient) throws ExecutionException, InterruptedException {
        Set<String> topics = ((Map)adminClient.listTopics().namesToListings().get()).keySet();
        KafkaSupport.waitForPartitionOffsetCommitForTopics(adminClient, topics);
    }

    public static void waitForPartitionOffsetCommitForTopics(AdminClient adminClient, Set<String> topics) throws ExecutionException, InterruptedException {
        Set<TopicPartition> topicPartitions = KafkaSupport.getPartitions(adminClient, topics);
        KafkaSupport.waitForPartitionOffsetCommitForPartitions(adminClient, topicPartitions);
    }

    public static void waitForPartitionOffsetCommitForPartitions(AdminClient adminClient, Set<TopicPartition> topicPartitions) throws ExecutionException, InterruptedException {
        Set<String> consumerGroups = ((Collection)adminClient.listConsumerGroups().all().get()).stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
        KafkaSupport.waitForPartitionOffsetCommitForPartitions(adminClient, topicPartitions, consumerGroups);
    }

    public static void waitForPartitionOffsetCommitForPartitions(AdminClient adminClient, Set<TopicPartition> topicPartitions, Set<String> consumerGroups) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        int attempt = 0;
        boolean offsetCommitted = false;
        while (!offsetCommitted) {
            if (++attempt > OFFSET_COMMIT_WAIT_ATTEMPTS_MAX) {
                throw new RuntimeException("Exceeded maximum attempts (" + OFFSET_COMMIT_WAIT_ATTEMPTS_MAX + ") waiting for offset commit for partitions.");
            }
            log.debug("[KTS] Waiting for offset commit is requested, attempt {}", (Object)attempt);
            Map<TopicPartition, Long> topicsOffsets = KafkaSupport.getOffsetsForPartitions(adminClient, topicPartitions);
            Long topicsOffsetsTotal = topicsOffsets.values().stream().mapToLong(Long::longValue).sum();
            if (topicsOffsetsTotal.equals(topicsOffsetsTotalInThread.get())) {
                log.debug("[KTS] Topic offset is not changed; Waiting for offset commit is finished in {} ms", (Object)(System.currentTimeMillis() - startTime));
                return;
            }
            Set<TopicPartition> topicPartitionsToCheck = topicsOffsets.entrySet().stream().filter(e -> !((Long)e.getValue()).equals(0L)).map(Map.Entry::getKey).collect(Collectors.toSet());
            Map<String, Map<TopicPartition, Long>> consumerGroupsOffsets = KafkaSupport.getOffsetsForConsumerGroups(adminClient, consumerGroups, topicPartitionsToCheck);
            offsetCommitted = KafkaSupport.checkOffsetCommitted(topicPartitions, consumerGroups, topicsOffsets, consumerGroupsOffsets);
            if (offsetCommitted) {
                Long topicsOffsetsTotalFinish = KafkaSupport.getOffsetsForPartitions(adminClient, topicPartitions).values().stream().mapToLong(Long::longValue).sum();
                offsetCommitted = topicsOffsetsTotal.equals(topicsOffsetsTotalFinish);
            }
            if (offsetCommitted) {
                topicsOffsetsTotalInThread.set(topicsOffsetsTotal);
                continue;
            }
            log.warn("[KTS] Some offsets are not equal. Waiting for further message processing before proceeding. Refreshing end offsets and reevaluating.");
            try {
                Thread.sleep(OFFSET_COMMIT_WAIT_TIME);
            }
            catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
        log.debug("[KTS] Waiting for offset commit is finished in {} ms", (Object)(System.currentTimeMillis() - startTime));
    }

    private static boolean checkOffsetCommitted(Set<TopicPartition> topicPartitions, Set<String> consumerGroups, Map<TopicPartition, Long> topicsOffsets, Map<String, Map<TopicPartition, Long>> consumerGroupsOffsets) {
        boolean result = true;
        OffsetSnapshotFrame offsetSnapshotFrame = new OffsetSnapshotFrame();
        for (String consumerGroup : consumerGroups) {
            for (TopicPartition topicPartition : topicPartitions) {
                Long consumerGroupOffset = (Long)consumerGroupsOffsets.getOrDefault(consumerGroup, Map.of()).get(topicPartition);
                if (consumerGroupOffset == null) continue;
                Long partitionOffset = topicsOffsets.get(topicPartition);
                boolean equal = partitionOffset == null || partitionOffset == 0L || partitionOffset.equals(consumerGroupOffset);
                result = result && equal;
                offsetSnapshotFrame.append(consumerGroup, topicPartition, consumerGroupOffset, partitionOffset);
            }
            offsetSnapshotFrame.split();
        }
        log.debug(offsetSnapshotFrame.toString());
        return result;
    }

    public static Set<TopicPartition> getPartitions(AdminClient adminClient, Set<String> topics) throws ExecutionException, InterruptedException {
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
        DescribeTopicsResult topicInfo = adminClient.describeTopics(topics);
        for (String topic : topics) {
            int partitions = ((TopicDescription)((KafkaFuture)topicInfo.topicNameValues().get(topic)).get()).partitions().size();
            for (int i = 0; i < partitions; ++i) {
                topicPartitions.add(new TopicPartition(topic, i));
            }
        }
        return topicPartitions;
    }

    public static Map<TopicPartition, Long> getOffsetsForPartitions(AdminClient adminClient, Set<TopicPartition> topicPartitions) throws ExecutionException, InterruptedException {
        Map<TopicPartition, OffsetSpec> topicPartitionsWithSpecs = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
        return ((Map)adminClient.listOffsets(topicPartitionsWithSpecs).all().get()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((ListOffsetsResult.ListOffsetsResultInfo)entry.getValue()).offset()));
    }

    public static Map<String, Map<TopicPartition, Long>> getOffsetsForConsumerGroups(AdminClient adminClient, Set<String> consumerGroups, Set<TopicPartition> topicPartitions) throws ExecutionException, InterruptedException {
        HashMap<String, ListConsumerGroupOffsetsSpec> groupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>();
        for (String consumerGroup : consumerGroups) {
            ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec();
            spec.topicPartitions(topicPartitions);
            groupSpecs.put(consumerGroup, spec);
        }
        ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(groupSpecs);
        HashMap<String, Map<TopicPartition, Long>> currentOffsets = new HashMap<String, Map<TopicPartition, Long>>();
        for (String consumerGroup : consumerGroups) {
            ((Map)offsetsResult.partitionsToOffsetAndMetadata(consumerGroup).get()).forEach((tp, oam) -> {
                if (oam == null) {
                    return;
                }
                currentOffsets.computeIfAbsent(consumerGroup, c -> new HashMap()).put(tp, oam.offset());
            });
        }
        return currentOffsets;
    }
}

