/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kudu.scanner;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
import org.apache.apex.malhar.kudu.ApexKuduConnection;
import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
import org.apache.kudu.client.AsyncKuduScanner;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanToken;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKuduPartitionScanner<T, C extends InputOperatorControlTuple> {
    @JsonIgnore
    AbstractKuduInputOperator<T, C> parentOperator;
    ExecutorService kuduConsumerExecutor;
    ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionInfo;
    int threadPoolExecutorSize = 1;
    Map<Integer, ApexKuduConnection> connectionPoolForThreads = new HashMap<Integer, ApexKuduConnection>();
    private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduPartitionScanner.class);

    public abstract int scanAllRecords(SQLToKuduPredicatesTranslator var1, Map<String, Object> var2) throws IOException;

    public AbstractKuduPartitionScanner(AbstractKuduInputOperator<T, C> parentOperatorRunningThisScanner) {
        this.parentOperator = parentOperatorRunningThisScanner;
        this.apexKuduConnectionInfo = this.parentOperator.getApexKuduConnectionInfo();
    }

    public void initScannerCommons() {
        this.kuduConsumerExecutor = Executors.newFixedThreadPool(this.threadPoolExecutorSize);
        List<KuduPartitionScanAssignmentMeta> allPartitionsThatNeedScan = this.parentOperator.getPartitionPieAssignment();
        Collections.sort(allPartitionsThatNeedScan, new Comparator<KuduPartitionScanAssignmentMeta>(){

            @Override
            public int compare(KuduPartitionScanAssignmentMeta left, KuduPartitionScanAssignmentMeta right) {
                return left.getOrdinal() - right.getOrdinal();
            }
        });
        for (int i = 0; i < this.threadPoolExecutorSize; ++i) {
            this.connectionPoolForThreads.put(i, this.apexKuduConnectionInfo.build());
        }
        LOG.info("Scanner running with " + this.connectionPoolForThreads.size() + " kudu connections");
    }

    public ApexKuduConnection verifyConnectionStaleness(int indexPos) {
        ApexKuduConnection apexKuduConnection = this.connectionPoolForThreads.get(indexPos);
        Preconditions.checkNotNull((Object)apexKuduConnection, (Object)"Null connection not expected while checking staleness of existing connection");
        if (apexKuduConnection.getKuduSession().isClosed()) {
            try {
                apexKuduConnection.close();
            }
            catch (Exception e) {
                LOG.error(" Could not close a possibly stale kudu connection handle ", (Throwable)e);
            }
            LOG.info("Ripped the old kudu connection out and building a new connection for this scanner");
            ApexKuduConnection newConnection = apexKuduConnection.getBuilderForThisConnection().build();
            this.connectionPoolForThreads.put(indexPos, newConnection);
            return newConnection;
        }
        return apexKuduConnection;
    }

    public List<KuduPartitionScanAssignmentMeta> preparePlanForScanners(SQLToKuduPredicatesTranslator parsedQuery) throws IOException {
        List<KuduPredicate> predicateList = parsedQuery.getKuduSQLParseTreeListener().getKuduPredicateList();
        ApexKuduConnection apexKuduConnection = this.verifyConnectionStaleness(0);
        KuduScanToken.KuduScanTokenBuilder builder = apexKuduConnection.getKuduClient().newScanTokenBuilder(apexKuduConnection.getKuduTable());
        builder = (KuduScanToken.KuduScanTokenBuilder)builder.setProjectedColumnNames(new ArrayList<String>(parsedQuery.getKuduSQLParseTreeListener().getListOfColumnsUsed()));
        for (KuduPredicate aPredicate : predicateList) {
            builder = (KuduScanToken.KuduScanTokenBuilder)builder.addPredicate(aPredicate);
        }
        builder.setFaultTolerant(this.parentOperator.isFaultTolerantScanner());
        Map<String, String> optionsUsedForThisQuery = this.parentOperator.getOptionsEnabledForCurrentQuery();
        if (optionsUsedForThisQuery.containsKey("read_snapshot_time")) {
            try {
                long readSnapShotTime = Long.valueOf(optionsUsedForThisQuery.get("read_snapshot_time"));
                builder = (KuduScanToken.KuduScanTokenBuilder)builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT);
                builder = (KuduScanToken.KuduScanTokenBuilder)builder.snapshotTimestampMicros(readSnapShotTime);
                LOG.info("Using read snapshot for this query as " + readSnapShotTime);
            }
            catch (Exception ex) {
                LOG.error("Cannot parse the Read snaptshot time " + ex.getMessage(), (Throwable)ex);
            }
        }
        List allPossibleScanTokens = builder.build();
        Collections.sort(allPossibleScanTokens, new Comparator<KuduScanToken>(){

            @Override
            public int compare(KuduScanToken left, KuduScanToken right) {
                return left.compareTo(right);
            }
        });
        LOG.info(" Query will scan " + allPossibleScanTokens.size() + " tablets");
        if (LOG.isDebugEnabled()) {
            LOG.debug(" Predicates scheduled for this query are " + predicateList.size());
            for (int i = 0; i < allPossibleScanTokens.size(); ++i) {
                LOG.debug("A tablet scheduled for all operators scanning is " + ((KuduScanToken)allPossibleScanTokens.get(i)).getTablet());
            }
        }
        List<KuduPartitionScanAssignmentMeta> partitionPieForThisOperator = this.parentOperator.getPartitionPieAssignment();
        ArrayList<KuduPartitionScanAssignmentMeta> returnOfAssignments = new ArrayList<KuduPartitionScanAssignmentMeta>();
        int totalScansForThisQuery = allPossibleScanTokens.size();
        int counterForPartAssignments = 0;
        for (KuduPartitionScanAssignmentMeta aPartofThePie : partitionPieForThisOperator) {
            if (aPartofThePie.getOrdinal() >= totalScansForThisQuery) continue;
            KuduPartitionScanAssignmentMeta aMetaForThisQuery = new KuduPartitionScanAssignmentMeta();
            aMetaForThisQuery.setTotalSize(totalScansForThisQuery);
            aMetaForThisQuery.setOrdinal(counterForPartAssignments);
            ++counterForPartAssignments;
            aMetaForThisQuery.setCurrentQuery(parsedQuery.getSqlExpresssion());
            KuduScanToken aTokenForThisOperator = (KuduScanToken)allPossibleScanTokens.get(aPartofThePie.getOrdinal());
            aMetaForThisQuery.setSerializedKuduScanToken(aTokenForThisOperator.serialize());
            returnOfAssignments.add(aMetaForThisQuery);
            LOG.debug("Added query scan for this operator " + aMetaForThisQuery + " with scan tablet as " + ((KuduScanToken)allPossibleScanTokens.get(aPartofThePie.getOrdinal())).getTablet());
        }
        LOG.info(" A total of " + returnOfAssignments.size() + " have been scheduled for this operator");
        return returnOfAssignments;
    }

    public void close() {
        for (int i = 0; i < this.connectionPoolForThreads.size(); ++i) {
            try {
                this.connectionPoolForThreads.get(i).close();
                continue;
            }
            catch (Exception e) {
                LOG.error("Error while closing kudu connection ", (Throwable)e);
            }
        }
        this.kuduConsumerExecutor.shutdown();
    }

    public AbstractKuduInputOperator<T, C> getParentOperator() {
        return this.parentOperator;
    }

    public void setParentOperator(AbstractKuduInputOperator<T, C> parentOperator) {
        this.parentOperator = parentOperator;
    }

    public Map<Integer, ApexKuduConnection> getConnectionPoolForThreads() {
        return this.connectionPoolForThreads;
    }

    public void setConnectionPoolForThreads(Map<Integer, ApexKuduConnection> connectionPoolForThreads) {
        this.connectionPoolForThreads = connectionPoolForThreads;
    }
}

