/*
 * Decompiled with CFR 0.152.
 */
package io.gitee.mrxangel.consumer;

import io.gitee.mrxangel.config.PulsarProperties;
import io.gitee.mrxangel.consumer.PulsarConsumerListener;
import io.gitee.mrxangel.enums.SubscriptionInitialPositionEnum;
import io.gitee.mrxangel.enums.SubscriptionTypeEnum;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Component
@Configuration
public class PulsarConsumerHandler
implements BeanPostProcessor {
    private PulsarClient pulsarClient;
    @Resource
    private PulsarProperties pulsarProperties;

    public PulsarConsumerHandler(PulsarClient pulsarClient) {
        this.pulsarClient = pulsarClient;
    }

    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Arrays.stream(bean.getClass().getDeclaredMethods()).filter(v -> v.isAnnotationPresent(PulsarConsumerListener.class)).forEach(method -> {
            PulsarConsumerListener annotation = method.getAnnotation(PulsarConsumerListener.class);
            this.initConsumer(bean, (Method)method, annotation);
        });
        return bean;
    }

    public void initConsumer(Object bean, Method method, PulsarConsumerListener annotation) {
        try {
            this.pulsarClient.newConsumer().topic(new String[]{annotation.destination()}).subscriptionName(StringUtils.isAllEmpty((CharSequence[])new CharSequence[]{annotation.subscriptionName()}) ? this.pulsarProperties.getSubscription() : annotation.subscriptionName()).subscriptionType(SubscriptionTypeEnum.of(this.pulsarProperties.getSubscriptionType())).subscriptionInitialPosition(SubscriptionInitialPositionEnum.of(this.pulsarProperties.getSubscriptionInitialPosition())).negativeAckRedeliveryDelay((long)this.pulsarProperties.getNegativeAckRedeliveryDelay().intValue(), TimeUnit.SECONDS).messageListener((MessageListener & Serializable)(consumer, msg) -> {
                try {
                    method.setAccessible(true);
                    method.invoke(bean, msg);
                    consumer.acknowledge(msg);
                }
                catch (Exception e) {
                    consumer.negativeAcknowledge(msg);
                    throw new RuntimeException("TODO Custom Exception!", e);
                }
            }).subscribe();
        }
        catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }

    public Object postProcessAfterInitialization(Object bean, String beanName) {
        return bean;
    }
}

