/*
 * Decompiled with CFR 0.152.
 */
package org.aerogear.kafka.impl;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.enterprise.context.spi.Contextual;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.CDI;
import javax.enterprise.util.AnnotationLiteral;
import org.aerogear.kafka.DefaultConsumerRebalanceListener;
import org.aerogear.kafka.cdi.annotation.KafkaConfig;
import org.aerogear.kafka.cdi.extension.VerySimpleEnvironmentResolver;
import org.aerogear.kafka.impl.CdiRequestScopeUtils;
import org.aerogear.kafka.impl.KafkaCdiMetrics;
import org.aerogear.kafka.serialization.CafdiSerdes;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.jboss.weld.context.bound.BoundRequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelegationKafkaConsumer
implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(DelegationKafkaConsumer.class);
    private final AtomicBoolean running = new AtomicBoolean(Boolean.TRUE);
    private final Properties properties = new Properties();
    private Object consumerInstance;
    private KafkaConsumer<?, ?> consumer;
    private List<String> topics;
    private AnnotatedMethod annotatedListenerMethod;
    private ConsumerRebalanceListener consumerRebalanceListener;
    private int numberOfRetries;
    private long retryBackoffMs;
    private Class<?>[] parameterTypes;
    private Type[] genericParameterTypes;
    private ConsumerMode mode;
    private KafkaCdiMetrics metrics;
    private final AtomicBoolean started = new AtomicBoolean(false);

    private ConsumerRebalanceListener createConsumerRebalanceListener(Class<? extends ConsumerRebalanceListener> consumerRebalanceListenerClazz) {
        if (consumerRebalanceListenerClazz.equals(DefaultConsumerRebalanceListener.class)) {
            return new DefaultConsumerRebalanceListener((Consumer)this.consumer);
        }
        try {
            return consumerRebalanceListenerClazz.getDeclaredConstructor(Consumer.class).newInstance(this.consumer);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            this.logger.error("Could not create desired listener, using DefaultConsumerRebalanceListener", (Throwable)e);
            return new DefaultConsumerRebalanceListener((Consumer)this.consumer);
        }
    }

    private Class<?> consumerKeyType(Class<?> defaultKeyType) {
        if (this.parameterTypes.length >= 2) {
            return this.parameterTypes[0];
        }
        if (this.parameterTypes.length == 1 && ConsumerRecords.class.isAssignableFrom(this.parameterTypes[0])) {
            return (Class)((ParameterizedType)this.genericParameterTypes[0]).getActualTypeArguments()[0];
        }
        return defaultKeyType;
    }

    private Class<?> consumerValueType() {
        if (this.parameterTypes.length >= 2) {
            return this.parameterTypes[1];
        }
        if (this.parameterTypes.length == 1 && ConsumerRecords.class.isAssignableFrom(this.parameterTypes[0])) {
            return (Class)((ParameterizedType)this.genericParameterTypes[0]).getActualTypeArguments()[1];
        }
        return this.parameterTypes[0];
    }

    private <K, V> void createKafkaConsumer(Class<K> keyType, Class<V> valueType, Properties consumerProperties) {
        this.consumer = new KafkaConsumer(consumerProperties, CafdiSerdes.serdeFrom(keyType).deserializer(), CafdiSerdes.serdeFrom(valueType).deserializer());
    }

    public void initialize(String bootstrapServers, AnnotatedMethod annotatedMethod, BeanManager beanManager, KafkaConfig kafkaConfig) {
        org.aerogear.kafka.cdi.annotation.Consumer consumerAnnotation = (org.aerogear.kafka.cdi.annotation.Consumer)annotatedMethod.getAnnotation(org.aerogear.kafka.cdi.annotation.Consumer.class);
        this.topics = Arrays.stream(consumerAnnotation.topics()).map(VerySimpleEnvironmentResolver::resolveVariables).collect(Collectors.toList());
        this.numberOfRetries = IntStream.of(consumerAnnotation.retries(), kafkaConfig.defaultConsumerRetries()).filter(v -> v > 0).findFirst().orElse(0);
        this.retryBackoffMs = IntStream.of(consumerAnnotation.retryBackoffMs(), kafkaConfig.defaultConsumerRetryBackoffMs()).filter(v -> v > 0).findFirst().orElse(0);
        String groupId = VerySimpleEnvironmentResolver.resolveVariables(consumerAnnotation.groupId());
        Class<?> recordKeyType = consumerAnnotation.keyType();
        this.annotatedListenerMethod = annotatedMethod;
        this.parameterTypes = this.annotatedListenerMethod.getJavaMember().getParameterTypes();
        this.genericParameterTypes = this.annotatedListenerMethod.getJavaMember().getGenericParameterTypes();
        this.mode = this.getConsumerMode(this.parameterTypes);
        Class<?> keyTypeClass = this.consumerKeyType(recordKeyType);
        Class<?> valTypeClass = this.consumerValueType();
        this.properties.put("bootstrap.servers", bootstrapServers);
        this.properties.put("group.id", groupId);
        IntStream.of(consumerAnnotation.fetchMaxWaitMs(), kafkaConfig.defaultFetchMaxWaitMs()).filter(value -> value > 0).findFirst().ifPresent(value -> this.properties.put("fetch.max.wait.ms", (Object)value));
        IntStream.of(consumerAnnotation.requestTimeoutMs(), kafkaConfig.defaultRequestTimeoutMs()).filter(value -> value > 0).findFirst().ifPresent(value -> this.properties.put("request.timeout.ms", (Object)value));
        this.properties.put("auto.offset.reset", consumerAnnotation.offset());
        this.properties.put("key.deserializer", CafdiSerdes.serdeFrom(keyTypeClass).deserializer().getClass());
        this.properties.put("value.deserializer", CafdiSerdes.serdeFrom(valTypeClass).deserializer().getClass());
        this.createKafkaConsumer(keyTypeClass, valTypeClass, this.properties);
        this.consumerRebalanceListener = this.createConsumerRebalanceListener(consumerAnnotation.consumerRebalanceListener());
        Set beans = beanManager.getBeans(this.annotatedListenerMethod.getJavaMember().getDeclaringClass(), new Annotation[0]);
        Bean propertyResolverBean = beanManager.resolve(beans);
        CreationalContext creationalContext = beanManager.createCreationalContext((Contextual)propertyResolverBean);
        Class<?> consumerType = this.annotatedListenerMethod.getJavaMember().getDeclaringClass();
        this.consumerInstance = beanManager.getReference(propertyResolverBean, consumerType, creationalContext);
        Bean metricsBean = beanManager.resolve(beanManager.getBeans(KafkaCdiMetrics.class, new Annotation[]{new AnnotationLiteral<Any>(){}}));
        this.metrics = (KafkaCdiMetrics)beanManager.getReference(metricsBean, KafkaCdiMetrics.class, beanManager.createCreationalContext((Contextual)metricsBean));
        this.metrics.consumerCreated();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void run() {
        try {
            this.consumer.subscribe(this.topics, this.consumerRebalanceListener);
            this.logger.info("subscribed to {}", this.topics);
            block18: while (this.isRunning()) {
                long pollStart = System.currentTimeMillis();
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
                this.logSlowPoll(pollStart, System.currentTimeMillis());
                if (!this.started.getAndSet(true)) {
                    this.metrics.consumerStarted();
                }
                for (ConsumerRecord record : records) {
                    BeanManager beanManager = CDI.current().getBeanManager();
                    Bean bean = beanManager.resolve(beanManager.getBeans(BoundRequestContext.class, new Annotation[0]));
                    CreationalContext creationalContext = beanManager.createCreationalContext(null);
                    int retries = 0;
                    boolean success = false;
                    while (true) {
                        ConcurrentHashMap<String, Object> requestDataStore;
                        BoundRequestContext boundRequestContext;
                        block25: {
                            boundRequestContext = (BoundRequestContext)beanManager.getReference(bean, BoundRequestContext.class, creationalContext);
                            requestDataStore = new ConcurrentHashMap<String, Object>();
                            CdiRequestScopeUtils.start(boundRequestContext, requestDataStore);
                            if (this.mode == ConsumerMode.SINGLE) {
                                this.dispatchSinglePayload(record);
                                break block25;
                            }
                            this.dispatchCompletePayload(records);
                            CdiRequestScopeUtils.end(boundRequestContext, requestDataStore);
                            creationalContext.release();
                            continue block18;
                        }
                        try {
                            block26: {
                                success = true;
                                break block26;
                                {
                                    catch (Throwable throwable) {
                                        throw throwable;
                                    }
                                }
                                finally {
                                    CdiRequestScopeUtils.end(boundRequestContext, requestDataStore);
                                }
                            }
                            this.logger.trace("dispatched payload {} to consumer", record.value());
                            continue;
                        }
                        catch (IllegalAccessException e) {
                            this.logger.error("error dispatching received value to consumer", (Throwable)e);
                            break;
                        }
                        catch (SerializationException e) {
                            this.logger.error("failed to deserialize message, giving up", (Throwable)e);
                            break;
                        }
                        catch (InvocationTargetException e) {
                            if (retries == this.numberOfRetries) {
                                this.logger.error(String.format("error dispatching received value to consumer, giving up after run %d/%d", retries + 1, this.numberOfRetries), (Throwable)e);
                            } else {
                                this.logger.warn(String.format("failed on run %d/%d, will retry: %s", retries + 1, this.numberOfRetries, e.toString()));
                                this.sleepRetryBackoffMs();
                            }
                            ++retries;
                        }
                        if (!success && retries <= this.numberOfRetries) continue;
                        break;
                    }
                    creationalContext.release();
                    continue;
                    catch (Throwable throwable) {
                        creationalContext.release();
                        throw throwable;
                    }
                }
                continue;
                return;
            }
        }
        catch (WakeupException e) {
            if (!this.isRunning()) return;
            this.logger.trace("Unexpected WakeupException", (Throwable)e);
            throw e;
        }
        catch (Throwable t) {
            this.logger.error("Unexpected fatal error", t);
            return;
        }
        finally {
            this.logger.info("Close the consumer.");
            this.metrics.consumerClosed();
            this.consumer.close();
        }
    }

    private void logSlowPoll(long pollStart, long pollEnd) {
        long slowThreshold = 2000L;
        long diffMs = pollEnd - pollStart;
        if (diffMs > 2000L) {
            this.logger.warn("slow kafka poll() took {} ms - longer than warning threshold of {} ms", (Object)diffMs, (Object)2000L);
        }
    }

    private void sleepRetryBackoffMs() {
        try {
            Thread.sleep(this.retryBackoffMs);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void dispatchSinglePayload(ConsumerRecord<?, ?> record) throws IllegalAccessException, InvocationTargetException {
        this.logger.trace("dispatching payload {} to consumer", record.value());
        if (this.parameterTypes.length == 3) {
            this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, record.key(), record.value(), record.headers());
        } else if (this.parameterTypes.length == 2) {
            this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, record.key(), record.value());
        } else {
            this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, record.value());
        }
    }

    private void dispatchCompletePayload(ConsumerRecords<?, ?> records) throws IllegalAccessException, InvocationTargetException {
        this.logger.trace("dispatching payload {} consumer records to consumer", (Object)records.count());
        this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, records);
    }

    private ConsumerMode getConsumerMode(Class<?>[] parameterTypes) {
        if (parameterTypes.length > 1) {
            return ConsumerMode.SINGLE;
        }
        if (parameterTypes.length == 1) {
            if (parameterTypes[0].isAssignableFrom(ConsumerRecords.class)) {
                return ConsumerMode.ALL;
            }
            return ConsumerMode.SINGLE;
        }
        throw new IllegalArgumentException("Consumer methods must have at least one parameter.");
    }

    private boolean isRunning() {
        return this.running.get();
    }

    public void shutdown() {
        this.logger.info("Shutting down the consumer.");
        this.running.set(Boolean.FALSE);
        this.consumer.wakeup();
    }

    private static enum ConsumerMode {
        SINGLE,
        ALL;

    }
}

