/*
 * Decompiled with CFR 0.152.
 */
package io.macronova.kafka.connect.jms.source;

import io.macronova.kafka.connect.jms.common.BaseConnectorConfig;
import io.macronova.kafka.connect.jms.common.JmsConnectionPool;
import io.macronova.kafka.connect.jms.common.JmsConverter;
import io.macronova.kafka.connect.jms.common.JmsSessionProvider;
import io.macronova.kafka.connect.jms.source.JmsSourceConnectorConfig;
import io.macronova.kafka.connect.jms.util.JmsUtils;
import io.macronova.kafka.connect.jms.util.StringUtils;
import io.macronova.kafka.connect.jms.util.Version;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsSourceTask
extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(JmsSourceTask.class);
    private static final Map<String, ?> sourcePartition = new HashMap();
    private static final Map<String, ?> sourceOffset = new HashMap();
    private Map<String, String> configProperties = null;
    private JmsSourceConnectorConfig config = null;
    private int maxPollRecords = -1;
    private long pollingTimeout = -1L;
    private String topic = null;
    private JmsConverter converter = null;
    private JmsSessionProvider provider = null;
    private Session session = null;
    private Destination destination = null;
    private MessageConsumer consumer = null;
    private Map<SourceRecord, Message> messagesToAcknowledge = new ConcurrentHashMap<SourceRecord, Message>();
    private Message lastMessage = null;
    private volatile boolean shuttingDown = false;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> properties) {
        this.configProperties = properties;
        this.config = new JmsSourceConnectorConfig(properties);
        this.topic = this.config.getString("topics");
        this.maxPollRecords = this.config.getInt("max.poll.records");
        this.pollingTimeout = this.config.getLong("poll.timeout.ms");
        try {
            this.converter = (JmsConverter)Class.forName(this.config.getString("jms.message.converter")).newInstance();
            this.converter.configure(this.configProperties);
            this.provider = JmsConnectionPool.getOrCreateConnection(this.configProperties);
            this.createConsumer();
            this.shuttingDown = false;
        }
        catch (Exception e) {
            this.terminate();
            throw new ConnectException("Failed to start JMS source task: " + e.getMessage() + ".", (Throwable)e);
        }
    }

    private void createConsumer() throws Exception {
        this.session = this.provider.createSession(false);
        this.destination = this.provider.resolveDestination(this.session, this.config.getString("jms.destination.name"), this.config.getString("jms.destination.type"));
        String selector = this.config.getString("jms.selector");
        String string = selector = StringUtils.isEmpty(selector) ? null : selector;
        if (this.destination instanceof Queue) {
            this.consumer = this.session.createConsumer(this.destination, selector);
        } else {
            String subscription = this.config.getString("jms.topic.subscription.name");
            Boolean durableSubscription = this.config.getBoolean("jms.topic.subscription.durable");
            Boolean sharedSubscription = this.config.getBoolean("jms.topic.subscription.shared");
            if (!durableSubscription.booleanValue()) {
                this.consumer = this.session.createConsumer(this.destination, selector);
            } else {
                if (StringUtils.isEmpty(subscription)) {
                    throw new ConnectException("When polling messages from JMS topic, please specify 'jms.topic.subscription.name' property.");
                }
                this.consumer = sharedSubscription == false ? this.session.createDurableSubscriber((Topic)this.destination, subscription, selector, false) : this.session.createSharedDurableConsumer((Topic)this.destination, subscription, selector);
            }
        }
    }

    private void terminateConsumer() {
        if (this.consumer != null) {
            try {
                this.consumer.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.consumer = null;
        }
        if (this.session != null) {
            try {
                this.session.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.session = null;
        }
    }

    private void terminate() {
        this.terminateConsumer();
        if (this.provider != null) {
            JmsConnectionPool.destroyConnection(this.configProperties);
            this.provider = null;
        }
    }

    public synchronized List<SourceRecord> poll() {
        if (this.shuttingDown) {
            return null;
        }
        ArrayList<SourceRecord> result = new ArrayList<SourceRecord>(this.maxPollRecords);
        HashMap<SourceRecord, Message> receivedMessages = new HashMap<SourceRecord, Message>();
        int count = 0;
        int remainingRetries = this.config.getInt("max.retries");
        Throwable lastException = null;
        Message latestMessage = null;
        while (count < this.maxPollRecords && remainingRetries >= 0) {
            try {
                if (this.provider.isClosed() || this.session == null || this.consumer == null) {
                    this.tryToReconnect();
                }
                while (count < this.maxPollRecords) {
                    Message message = this.consumer.receive(this.pollingTimeout / (long)this.maxPollRecords);
                    if (message != null) {
                        SourceRecord record = this.converter.messageToRecord(message, this.topic, sourcePartition, sourceOffset);
                        result.add(record);
                        receivedMessages.put(record, message);
                        latestMessage = message;
                    }
                    ++count;
                    remainingRetries = this.config.getInt("max.retries");
                    lastException = null;
                }
            }
            catch (Exception e) {
                log.error("Failed to poll messages from destination '" + JmsUtils.destinationNameForLog(this.destination) + "': " + e.getMessage() + ".", (Throwable)e);
                this.recover();
                lastException = e;
                latestMessage = null;
                count = 0;
                receivedMessages.clear();
                result.clear();
                if (--remainingRetries < 0) continue;
                if (this.provider.getDialect().reconnectOnError(e)) {
                    this.provider.closeQuietly();
                }
                this.sleep();
            }
        }
        if (remainingRetries < 0 && lastException != null) {
            throw new ConnectException("Failed to poll records from JMS destination '" + JmsUtils.destinationNameForLog(this.destination) + "': " + lastException.getMessage() + ".", lastException);
        }
        this.messagesToAcknowledge.putAll(receivedMessages);
        if (!receivedMessages.isEmpty()) {
            this.lastMessage = latestMessage;
        }
        return result;
    }

    private void recover() {
        try {
            if (this.session != null) {
                this.session.recover();
            }
        }
        catch (JMSException e) {
            log.error("Failed to recover JMS session: " + e.getMessage() + ".", (Throwable)e);
            this.provider.closeQuietly();
        }
    }

    private void tryToReconnect() throws Exception {
        log.info("Reconnecting to JMS server: " + BaseConnectorConfig.getBrokerUrl(this.configProperties) + ".");
        try {
            this.terminateConsumer();
            if (!this.provider.reconnect()) {
                throw new IllegalStateException("JMS reconnection in progress...");
            }
            this.createConsumer();
        }
        catch (Exception e) {
            log.error("Failed to re-establish connectivity with JMS server: " + e.getMessage() + ".", (Throwable)e);
            throw e;
        }
    }

    private void sleep() {
        try {
            Thread.sleep(this.config.getLong("retry.backoff.ms"));
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitRecord(SourceRecord record) throws InterruptedException {
        super.commitRecord(record);
        this.messagesToAcknowledge.remove(record);
        if (this.messagesToAcknowledge.isEmpty()) {
            try {
                this.lastMessage.acknowledge();
            }
            catch (Exception e) {
                log.error("Failed to acknowledge JMS message with ID '" + JmsUtils.messageIdForLog(this.lastMessage) + "' received from destination '" + JmsUtils.destinationNameForLog(this.destination) + "'. Redelivery will take place.", (Throwable)e);
                this.recover();
            }
            this.lastMessage = null;
            JmsSourceTask jmsSourceTask = this;
            synchronized (jmsSourceTask) {
                ((Object)((Object)this)).notifyAll();
            }
        }
    }

    public synchronized void stop() {
        this.shuttingDown = true;
        try {
            while (!this.messagesToAcknowledge.isEmpty()) {
                ((Object)((Object)this)).wait();
            }
        }
        catch (InterruptedException e) {
            log.warn("Interrupted process waiting for all in-flight JMS messages to be acknowledged.", (Throwable)e);
        }
        this.terminate();
    }

    void setConverter(JmsConverter converter) {
        this.converter = converter;
    }
}

