/*
 * Decompiled with CFR 0.152.
 */
package org.aerogear.kafka.impl;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import javax.enterprise.context.spi.Contextual;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import org.aerogear.kafka.cdi.annotation.KafkaStream;
import org.aerogear.kafka.serialization.CafdiSerdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelegationStreamProcessor {
    private final Logger logger = LoggerFactory.getLogger(DelegationStreamProcessor.class);
    final Properties properties = new Properties();
    private AnnotatedMethod annotatedProcessorMethod;
    private KafkaStreams streams;

    public void init(String bootstrapServers, AnnotatedMethod annotatedMethod, BeanManager beanManager) {
        this.annotatedProcessorMethod = annotatedMethod;
        KafkaStream streamAnnotation = (KafkaStream)annotatedMethod.getAnnotation(KafkaStream.class);
        Class keyType = (Class)((ParameterizedType)this.annotatedProcessorMethod.getJavaMember().getGenericParameterTypes()[0]).getActualTypeArguments()[0];
        Class valType = (Class)((ParameterizedType)this.annotatedProcessorMethod.getJavaMember().getGenericParameterTypes()[0]).getActualTypeArguments()[1];
        this.properties.put("application.id", "org-aerogear-kafka-cdi-" + UUID.randomUUID().toString());
        this.properties.put("bootstrap.servers", bootstrapServers);
        this.properties.put("default.key.serde", CafdiSerdes.serdeFrom(keyType).getClass());
        this.properties.put("default.value.serde", CafdiSerdes.serdeFrom(valType).getClass());
        this.properties.put("commit.interval.ms", (Object)streamAnnotation.commitInterval());
        StreamsConfig cfg = new StreamsConfig((Map)this.properties);
        StreamsBuilder builder = new StreamsBuilder();
        KStream source = builder.stream(streamAnnotation.input());
        Set beans = beanManager.getBeans(this.annotatedProcessorMethod.getJavaMember().getDeclaringClass(), new Annotation[0]);
        Bean propertyResolverBean = beanManager.resolve(beans);
        CreationalContext creationalContext = beanManager.createCreationalContext((Contextual)propertyResolverBean);
        Class<?> processorType = this.annotatedProcessorMethod.getJavaMember().getDeclaringClass();
        Object processorInstance = beanManager.getReference(propertyResolverBean, processorType, creationalContext);
        try {
            Object sink = this.annotatedProcessorMethod.getJavaMember().invoke(processorInstance, source);
            Class retKeyType = (Class)((ParameterizedType)this.annotatedProcessorMethod.getJavaMember().getGenericReturnType()).getActualTypeArguments()[0];
            Class retValType = (Class)((ParameterizedType)this.annotatedProcessorMethod.getJavaMember().getGenericReturnType()).getActualTypeArguments()[1];
            if (sink instanceof KStream) {
                KStream streamSink = (KStream)sink;
                streamSink.through(streamAnnotation.output(), Produced.with(CafdiSerdes.serdeFrom(retKeyType), CafdiSerdes.serdeFrom(retValType)));
            } else if (sink instanceof KTable) {
                KTable tableSink = (KTable)sink;
                tableSink.toStream().to(streamAnnotation.output(), Produced.with(CafdiSerdes.serdeFrom(retKeyType), CafdiSerdes.serdeFrom(retValType)));
            }
        }
        catch (IllegalAccessException | InvocationTargetException e) {
            this.logger.error("error dispatching received value to consumer", (Throwable)e);
        }
        try {
            this.streams = new KafkaStreams(builder.build(), cfg);
            this.streams.setStateListener((newState, oldState) -> {
                this.logger.trace("OLD STATE {}", (Object)oldState);
                this.logger.trace("NEW STATE {}", (Object)newState);
            });
            this.logger.trace("Starting the Streaming context");
            this.streams.start();
        }
        catch (Exception e) {
            this.logger.error("Could not start Kafka streaming client", (Throwable)e);
        }
    }
}

