/*
 * Decompiled with CFR 0.152.
 */
package org.aerogear.kafka.cdi.extension;

import java.lang.annotation.Annotation;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import javax.enterprise.context.spi.Contextual;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.spi.AfterDeploymentValidation;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.BeforeShutdown;
import javax.enterprise.inject.spi.Extension;
import javax.enterprise.inject.spi.InjectionPoint;
import javax.enterprise.inject.spi.InjectionTarget;
import javax.enterprise.inject.spi.ProcessAnnotatedType;
import javax.enterprise.inject.spi.ProcessInjectionTarget;
import javax.enterprise.inject.spi.WithAnnotations;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.aerogear.kafka.ExtendedKafkaProducer;
import org.aerogear.kafka.SimpleKafkaProducer;
import org.aerogear.kafka.cdi.annotation.Consumer;
import org.aerogear.kafka.cdi.annotation.KafkaConfig;
import org.aerogear.kafka.cdi.annotation.KafkaStream;
import org.aerogear.kafka.cdi.extension.VerySimpleEnvironmentResolver;
import org.aerogear.kafka.impl.DelegationKafkaConsumer;
import org.aerogear.kafka.impl.DelegationStreamProcessor;
import org.aerogear.kafka.impl.InjectedKafkaProducer;
import org.aerogear.kafka.serialization.CafdiSerdes;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaExtension<X>
implements Extension {
    private String bootstrapServers = null;
    private final Set<AnnotatedMethod<?>> listenerMethods = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Set<AnnotatedMethod<?>> streamProcessorMethods = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Set<DelegationKafkaConsumer> managedConsumers = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Set<Producer> managedProducers = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Logger logger = LoggerFactory.getLogger(KafkaExtension.class);
    private KafkaConfig kafkaConfig;

    public void kafkaConfig(@Observes @WithAnnotations(value={KafkaConfig.class}) ProcessAnnotatedType<X> pat) {
        this.logger.trace("Kafka config scanning type: " + pat.getAnnotatedType().getJavaClass().getName());
        AnnotatedType annotatedType = pat.getAnnotatedType();
        this.kafkaConfig = (KafkaConfig)annotatedType.getAnnotation(KafkaConfig.class);
        if (this.kafkaConfig != null && this.bootstrapServers == null) {
            this.logger.info("setting bootstrap.servers IP for, {}", (Object)this.kafkaConfig.bootstrapServers());
            this.bootstrapServers = VerySimpleEnvironmentResolver.resolveVariables(this.kafkaConfig.bootstrapServers());
        }
    }

    public void registerListeners(@Observes @WithAnnotations(value={Consumer.class, KafkaStream.class}) ProcessAnnotatedType<X> pat) {
        this.logger.trace("scanning type: " + pat.getAnnotatedType().getJavaClass().getName());
        AnnotatedType annotatedType = pat.getAnnotatedType();
        for (AnnotatedMethod am : annotatedType.getMethods()) {
            if (am.isAnnotationPresent(Consumer.class)) {
                this.logger.debug("found annotated listener method, adding for further processing");
                this.listenerMethods.add(am);
                continue;
            }
            if (!am.isAnnotationPresent(KafkaStream.class)) continue;
            this.logger.debug("found annotated stream method, adding for further processing");
            this.streamProcessorMethods.add(am);
        }
    }

    public void afterDeploymentValidation(@Observes AfterDeploymentValidation adv, BeanManager bm) {
        this.logger.debug("wiring annotated methods to internal Kafka Util clazzes");
        this.listenerMethods.forEach(consumerMethod -> {
            Bean bean = (Bean)bm.getBeans(DelegationKafkaConsumer.class, new Annotation[0]).iterator().next();
            CreationalContext ctx = bm.createCreationalContext((Contextual)bean);
            DelegationKafkaConsumer frameworkConsumer = (DelegationKafkaConsumer)bm.getReference(bean, DelegationKafkaConsumer.class, ctx);
            frameworkConsumer.initialize(this.bootstrapServers, (AnnotatedMethod)consumerMethod, bm, this.kafkaConfig);
            this.managedConsumers.add(frameworkConsumer);
            this.submitToExecutor(frameworkConsumer);
        });
        this.streamProcessorMethods.forEach(annotatedStreamMethod -> {
            Bean bean = (Bean)bm.getBeans(DelegationStreamProcessor.class, new Annotation[0]).iterator().next();
            CreationalContext ctx = bm.createCreationalContext((Contextual)bean);
            DelegationStreamProcessor frameworkProcessor = (DelegationStreamProcessor)bm.getReference(bean, DelegationStreamProcessor.class, ctx);
            frameworkProcessor.init(this.bootstrapServers, (AnnotatedMethod)annotatedStreamMethod, bm);
        });
    }

    public void beforeShutdown(@Observes BeforeShutdown bs) {
        this.managedConsumers.forEach(DelegationKafkaConsumer::shutdown);
        this.managedProducers.forEach(Producer::close);
    }

    public <X> void processInjectionTarget(@Observes ProcessInjectionTarget<X> pit) {
        final InjectionTarget it = pit.getInjectionTarget();
        final AnnotatedType at = pit.getAnnotatedType();
        InjectionTarget wrapped = new InjectionTarget<X>(){

            public void inject(X instance, CreationalContext<X> ctx) {
                it.inject(instance, ctx);
                Arrays.asList(at.getJavaClass().getDeclaredFields()).forEach(field -> {
                    org.aerogear.kafka.cdi.annotation.Producer annotation = field.getAnnotation(org.aerogear.kafka.cdi.annotation.Producer.class);
                    if (annotation != null && (field.getType().isAssignableFrom(SimpleKafkaProducer.class) || field.getType().isAssignableFrom(ExtendedKafkaProducer.class))) {
                        field.setAccessible(Boolean.TRUE);
                        Serde keySerde = CafdiSerdes.serdeFrom((Class)((ParameterizedType)field.getGenericType()).getActualTypeArguments()[0]);
                        Serde valSerde = CafdiSerdes.serdeFrom((Class)((ParameterizedType)field.getGenericType()).getActualTypeArguments()[1]);
                        Producer p = KafkaExtension.this.createInjectionProducer(KafkaExtension.this.bootstrapServers, keySerde.serializer().getClass(), valSerde.serializer().getClass(), keySerde.serializer(), valSerde.serializer(), annotation);
                        KafkaExtension.this.managedProducers.add(p);
                        try {
                            field.set(instance, p);
                        }
                        catch (IllegalAccessException | IllegalArgumentException e) {
                            KafkaExtension.this.logger.error("could not inject producer", (Throwable)e);
                            e.printStackTrace();
                        }
                    }
                });
            }

            public void postConstruct(X instance) {
                it.postConstruct(instance);
            }

            public void preDestroy(X instance) {
                it.preDestroy(instance);
            }

            public void dispose(X instance) {
                it.dispose(instance);
            }

            public Set<InjectionPoint> getInjectionPoints() {
                return it.getInjectionPoints();
            }

            public X produce(CreationalContext<X> ctx) {
                return it.produce(ctx);
            }
        };
        pit.setInjectionTarget(wrapped);
    }

    private void submitToExecutor(DelegationKafkaConsumer delegationKafkaConsumer) {
        ExecutorService executorService;
        try {
            executorService = (ExecutorService)InitialContext.doLookup("java:comp/DefaultManagedExecutorService");
        }
        catch (NamingException e) {
            this.logger.warn("Could not find a managed ExecutorService, creating one manually");
            executorService = new ThreadPoolExecutor(16, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>());
        }
        executorService.execute(delegationKafkaConsumer);
    }

    private Producer createInjectionProducer(String bootstrapServers, Class<?> keySerializerClass, Class<?> valSerializerClass, Serializer<?> keySerializer, Serializer<?> valSerializer, org.aerogear.kafka.cdi.annotation.Producer annotation) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer", keySerializerClass);
        properties.put("value.serializer", valSerializerClass);
        IntStream.of(annotation.lingerMs(), this.kafkaConfig.defaultLingerMs()).filter(value -> value >= 0).findFirst().ifPresent(value -> properties.put("linger.ms", (Object)value));
        IntStream.of(annotation.retries(), this.kafkaConfig.defaultProducerRetries()).filter(value -> value >= 0).findFirst().ifPresent(value -> properties.put("retries", (Object)value));
        IntStream.of(annotation.requestTimeoutMs(), this.kafkaConfig.defaultRequestTimeoutMs()).filter(value -> value >= 0).findFirst().ifPresent(value -> properties.put("request.timeout.ms", (Object)value));
        return new InjectedKafkaProducer(properties, keySerializer, valSerializer);
    }
}

