/*
 * Decompiled with CFR 0.152.
 */
package at.grahsl.kafka.connect.mongodb;

import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
import at.grahsl.kafka.connect.mongodb.MongoDbSinkRecordBatches;
import at.grahsl.kafka.connect.mongodb.VersionUtil;
import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler;
import at.grahsl.kafka.connect.mongodb.converter.SinkConverter;
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import at.grahsl.kafka.connect.mongodb.processor.PostProcessor;
import at.grahsl.kafka.connect.mongodb.writemodel.strategy.WriteModelStrategy;
import com.mongodb.BulkWriteException;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.MongoException;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbSinkTask
extends SinkTask {
    private static Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkTask.class);
    private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(false);
    private MongoDbSinkConnectorConfig sinkConfig;
    private MongoClient mongoClient;
    private MongoDatabase database;
    private int remainingRetries;
    private int deferRetryMs;
    private Map<String, PostProcessor> processorChains;
    private Map<String, CdcHandler> cdcHandlers;
    private Map<String, WriteModelStrategy> writeModelStrategies;
    private Map<String, MongoDbSinkConnectorConfig.RateLimitSettings> rateLimitSettings;
    private Map<String, WriteModelStrategy> deleteOneModelDefaultStrategies;
    private Map<String, MongoCollection<BsonDocument>> cachedCollections = new HashMap<String, MongoCollection<BsonDocument>>();
    private SinkConverter sinkConverter = new SinkConverter();

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> props) {
        LOGGER.info("starting MongoDB sink task");
        this.sinkConfig = new MongoDbSinkConnectorConfig(props);
        MongoClientURI uri = this.sinkConfig.buildClientURI();
        this.mongoClient = new MongoClient(uri);
        this.database = this.mongoClient.getDatabase(uri.getDatabase());
        this.remainingRetries = this.sinkConfig.getInt("mongodb.max.num.retries");
        this.deferRetryMs = this.sinkConfig.getInt("mongodb.retries.defer.timeout");
        this.processorChains = this.sinkConfig.buildPostProcessorChains();
        this.cdcHandlers = this.sinkConfig.getCdcHandlers();
        this.writeModelStrategies = this.sinkConfig.getWriteModelStrategies();
        this.rateLimitSettings = this.sinkConfig.getRateLimitSettings();
        this.deleteOneModelDefaultStrategies = this.sinkConfig.getDeleteOneModelDefaultStrategies();
    }

    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            LOGGER.debug("no sink records to process for current poll operation");
            return;
        }
        Map<String, MongoDbSinkRecordBatches> batchMapping = this.createSinkRecordBatchesPerTopic(records);
        batchMapping.forEach((namespace, batches) -> {
            String collection = StringUtils.substringAfter((String)namespace, (String)".");
            batches.getBufferedBatches().forEach(batch -> {
                this.processSinkRecords(this.cachedCollections.get(namespace), (List<SinkRecord>)batch);
                MongoDbSinkConnectorConfig.RateLimitSettings rls = this.rateLimitSettings.getOrDefault(collection, this.rateLimitSettings.get("__default__"));
                if (rls.isTriggered()) {
                    LOGGER.debug("rate limit settings triggering {}ms defer timeout after processing {} further batches for collection {}", new Object[]{rls.getTimeoutMs(), rls.getEveryN(), collection});
                    try {
                        Thread.sleep(rls.getTimeoutMs());
                    }
                    catch (InterruptedException e) {
                        LOGGER.error(e.getMessage());
                    }
                }
            });
        });
    }

    private void processSinkRecords(MongoCollection<BsonDocument> collection, List<SinkRecord> batch) {
        String collectionName = collection.getNamespace().getCollectionName();
        List<? extends WriteModel<BsonDocument>> docsToWrite = this.sinkConfig.isUsingCdcHandler(collectionName) ? this.buildWriteModelCDC(batch, collectionName) : this.buildWriteModel(batch, collectionName);
        try {
            if (!docsToWrite.isEmpty()) {
                LOGGER.debug("bulk writing {} document(s) into collection [{}]", (Object)docsToWrite.size(), (Object)collection.getNamespace().getFullName());
                BulkWriteResult result = collection.bulkWrite(docsToWrite, BULK_WRITE_OPTIONS);
                LOGGER.debug("mongodb bulk write result: " + result.toString());
            }
        }
        catch (MongoException mexc) {
            if (mexc instanceof BulkWriteException) {
                BulkWriteException bwe = (BulkWriteException)mexc;
                LOGGER.error("mongodb bulk write (partially) failed", (Throwable)bwe);
                LOGGER.error(bwe.getWriteResult().toString());
                LOGGER.error(bwe.getWriteErrors().toString());
                LOGGER.error(bwe.getWriteConcernError().toString());
            } else {
                LOGGER.error("error on mongodb operation", (Throwable)mexc);
                LOGGER.error("writing {} document(s) into collection [{}] failed -> remaining retries ({})", new Object[]{docsToWrite.size(), collection.getNamespace().getFullName(), this.remainingRetries});
            }
            if (this.remainingRetries-- <= 0) {
                throw new ConnectException("failed to write mongodb documents despite retrying -> GIVING UP! :( :( :(", (Throwable)mexc);
            }
            LOGGER.debug("deferring retry operation for {}ms", (Object)this.deferRetryMs);
            this.context.timeout((long)this.deferRetryMs);
            throw new RetriableException(mexc.getMessage(), (Throwable)mexc);
        }
    }

    Map<String, MongoDbSinkRecordBatches> createSinkRecordBatchesPerTopic(Collection<SinkRecord> records) {
        LOGGER.debug("number of sink records to process: {}", (Object)records.size());
        HashMap<String, MongoDbSinkRecordBatches> batchMapping = new HashMap<String, MongoDbSinkRecordBatches>();
        LOGGER.debug("buffering sink records into grouped topic batches");
        records.forEach(r -> {
            MongoDbSinkRecordBatches batches;
            String namespace;
            MongoCollection mongoCollection;
            String collection = this.sinkConfig.getString("mongodb.collection", r.topic());
            if (collection.isEmpty()) {
                LOGGER.debug("no explicit collection name mapping found for topic {} and default collection name was empty ", (Object)r.topic());
                LOGGER.debug("using topic name {} as collection name", (Object)r.topic());
                collection = r.topic();
            }
            if ((mongoCollection = this.cachedCollections.get(namespace = this.database.getName() + "." + collection)) == null) {
                mongoCollection = this.database.getCollection(collection, BsonDocument.class);
                this.cachedCollections.put(namespace, (MongoCollection<BsonDocument>)mongoCollection);
            }
            if ((batches = (MongoDbSinkRecordBatches)batchMapping.get(namespace)) == null) {
                int maxBatchSize = this.sinkConfig.getInt("mongodb.max.batch.size", collection);
                LOGGER.debug("batch size for collection {} is at most {} record(s)", (Object)collection, (Object)maxBatchSize);
                batches = new MongoDbSinkRecordBatches(maxBatchSize, records.size());
                batchMapping.put(namespace, batches);
            }
            batches.buffer((SinkRecord)r);
        });
        return batchMapping;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModel(Collection<SinkRecord> records, String collectionName) {
        ArrayList docsToWrite = new ArrayList(records.size());
        LOGGER.debug("building write model for {} record(s)", (Object)records.size());
        records.forEach(record -> {
            SinkDocument doc = this.sinkConverter.convert((SinkRecord)record);
            this.processorChains.getOrDefault(collectionName, this.processorChains.get("__default__")).process(doc, (SinkRecord)record);
            if (doc.getValueDoc().isPresent()) {
                docsToWrite.add(this.writeModelStrategies.getOrDefault(collectionName, this.writeModelStrategies.get("__default__")).createWriteModel(doc));
            } else if (doc.getKeyDoc().isPresent() && this.sinkConfig.isDeleteOnNullValues(record.topic())) {
                docsToWrite.add(this.deleteOneModelDefaultStrategies.getOrDefault(collectionName, this.deleteOneModelDefaultStrategies.get("__default__")).createWriteModel(doc));
            } else {
                LOGGER.error("skipping sink record " + record + "for which neither key doc nor value doc were present");
            }
        });
        return docsToWrite;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModelCDC(Collection<SinkRecord> records, String collectionName) {
        LOGGER.debug("building CDC write model for {} record(s) into collection {}", (Object)records.size(), (Object)collectionName);
        return records.stream().map(this.sinkConverter::convert).map(this.cdcHandlers.get(collectionName)::handle).flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)).collect(Collectors.toList());
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        LOGGER.info("stopping MongoDB sink task");
        this.mongoClient.close();
    }
}

