/*
 * Decompiled with CFR 0.152.
 */
package io.macronova.kafka.connect.jms.sink;

import io.macronova.kafka.connect.jms.common.BaseConnectorConfig;
import io.macronova.kafka.connect.jms.common.JmsConnectionPool;
import io.macronova.kafka.connect.jms.common.JmsConverter;
import io.macronova.kafka.connect.jms.common.JmsSessionProvider;
import io.macronova.kafka.connect.jms.sink.JmsSinkConnectorConfig;
import io.macronova.kafka.connect.jms.util.JmsUtils;
import io.macronova.kafka.connect.jms.util.Version;
import java.util.Collection;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsSinkTask
extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(JmsSinkTask.class);
    private JmsSinkConnectorConfig config = null;
    private Map<String, String> configProperties = null;
    private int remainingRetries = -1;
    private JmsSessionProvider provider = null;
    private Session session = null;
    private Destination destination = null;
    private MessageProducer producer = null;
    private JmsConverter converter = null;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> properties) {
        this.configProperties = properties;
        this.config = new JmsSinkConnectorConfig(properties);
        this.remainingRetries = this.config.getInt("max.retries");
        try {
            this.provider = JmsConnectionPool.getOrCreateConnection(this.configProperties);
            this.converter = (JmsConverter)Class.forName(this.config.getString("jms.message.converter")).newInstance();
            this.converter.configure(this.configProperties);
            this.createProducer();
        }
        catch (Exception e) {
            this.terminate();
            throw new ConnectException("Failed to start JMS sink task: " + e.getMessage() + ".", (Throwable)e);
        }
    }

    private void createProducer() throws Exception {
        this.session = this.provider.createSession(true);
        this.destination = this.provider.resolveDestination(this.session, this.config.getString("jms.destination.name"), this.config.getString("jms.destination.type"));
        this.producer = this.session.createProducer(this.destination);
    }

    private void terminateProducer() {
        if (this.producer != null) {
            try {
                this.producer.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.producer = null;
        }
        if (this.session != null) {
            this.tryRollback();
            try {
                this.session.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.session = null;
        }
    }

    private void terminate() {
        this.terminateProducer();
        if (this.provider != null) {
            JmsConnectionPool.destroyConnection(this.configProperties);
            this.provider = null;
        }
    }

    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            return;
        }
        try {
            if (this.provider.isClosed() || this.session == null || this.producer == null) {
                this.tryToReconnect();
            }
            for (SinkRecord record : records) {
                this.producer.send(this.converter.recordToMessage(this.session, record));
            }
            this.session.commit();
        }
        catch (Exception e) {
            log.error("Failed to publish messages to JMS destination '" + JmsUtils.destinationNameForLog(this.destination) + "': " + e.getMessage() + ".", (Throwable)e);
            this.tryRollback();
            if (this.remainingRetries == 0) {
                throw new ConnectException((Throwable)e);
            }
            if (this.provider.getDialect().reconnectOnError(e)) {
                this.provider.closeQuietly();
            }
            --this.remainingRetries;
            this.context.timeout(this.config.getLong("retry.backoff.ms").longValue());
            throw new RetriableException((Throwable)e);
        }
        this.remainingRetries = this.config.getInt("max.retries");
    }

    private void tryRollback() {
        try {
            if (this.session != null) {
                this.session.rollback();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void tryToReconnect() throws Exception {
        log.info("Reconnecting to JMS server: " + BaseConnectorConfig.getBrokerUrl(this.configProperties) + ".");
        try {
            this.terminateProducer();
            if (!this.provider.reconnect()) {
                throw new IllegalStateException("JMS reconnection in progress...");
            }
            this.createProducer();
        }
        catch (Exception e) {
            log.error("Failed to re-establish connectivity with JMS server: " + e.getMessage() + ".", (Throwable)e);
            throw e;
        }
    }

    public void stop() {
        this.terminate();
    }

    int getRemainingRetries() {
        return this.remainingRetries;
    }

    void setConverter(JmsConverter converter) {
        this.converter = converter;
    }
}

