package org.axonframework.boot.autoconfig;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.kafka.eventhandling.consumer.AsyncFetcher;
import org.axonframework.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.kafka.eventhandling.consumer.KafkaMessageSource;
import org.axonframework.kafka.eventhandling.consumer.SortedKafkaMessageBuffer;
import org.axonframework.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.kafka.eventhandling.producer.DefaultProducerFactory;
import org.axonframework.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.kafka.eventhandling.producer.KafkaPublisherConfiguration;
import org.axonframework.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.config.AxonConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableConfigurationProperties({KafkaProperties.class})
@Configuration
@ConditionalOnClass({KafkaPublisher.class})
@AutoConfigureAfter({AxonAutoConfiguration.class})
/* loaded from: input_file:org/axonframework/boot/autoconfig/KafkaAutoConfiguration.class */
public class KafkaAutoConfiguration {
    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties kafkaProperties) {
        this.properties = kafkaProperties;
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty({"axon.kafka.producer.transaction-id-prefix"})
    @Bean
    public ProducerFactory<String, byte[]> kafkaProducerFactory() {
        Map<String, Object> buildProducerProperties = this.properties.buildProducerProperties();
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix == null) {
            throw new IllegalStateException("transactionalIdPrefix cannot be empty");
        }
        return DefaultProducerFactory.builder(buildProducerProperties).withConfirmationMode(ConfirmationMode.TRANSACTIONAL).withTransactionalIdPrefix(transactionIdPrefix).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty({"axon.kafka.consumer.group-id"})
    @Bean
    public ConsumerFactory<String, byte[]> kafkaConsumerFactory() {
        return new DefaultConsumerFactory(this.properties.buildConsumerProperties());
    }

    @ConditionalOnMissingBean
    @Bean
    public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(@Qualifier("eventSerializer") Serializer serializer) {
        return new DefaultKafkaMessageConverter(serializer);
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
    @Bean(initMethod = "start", destroyMethod = "shutDown")
    public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> producerFactory, EventBus eventBus, KafkaMessageConverter<String, byte[]> kafkaMessageConverter, AxonConfiguration axonConfiguration) {
        return new KafkaPublisher<>(KafkaPublisherConfiguration.builder().withTopic(this.properties.getDefaultTopic()).withMessageConverter(kafkaMessageConverter).withProducerFactory(producerFactory).withMessageSource(eventBus).withMessageMonitor(axonConfiguration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")).build());
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ConsumerFactory.class, KafkaMessageConverter.class})
    @Bean(destroyMethod = "shutdown")
    public Fetcher kafkaFetcher(ConsumerFactory<String, byte[]> consumerFactory, KafkaMessageConverter<String, byte[]> kafkaMessageConverter) {
        return AsyncFetcher.builder(consumerFactory).withTopic(this.properties.getDefaultTopic()).withPollTimeout(this.properties.getFetcher().getPollTimeout(), TimeUnit.MILLISECONDS).withMessageConverter(kafkaMessageConverter).withBufferFactory(() -> {
            return new SortedKafkaMessageBuffer(this.properties.getFetcher().getBufferSize());
        }).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ConsumerFactory.class})
    @Bean
    public KafkaMessageSource kafkaMessageSource(Fetcher fetcher) {
        return new KafkaMessageSource(fetcher);
    }
}
