/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kudu.partitioner;

import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.lib.util.KryoCloneUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
import org.apache.apex.malhar.kudu.ApexKuduConnection;
import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduTable;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKuduInputPartitioner
implements Partitioner<AbstractKuduInputOperator> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduInputPartitioner.class);
    @JsonIgnore
    protected AbstractKuduInputOperator prototypeKuduInputOperator;

    public AbstractKuduInputPartitioner(AbstractKuduInputOperator prototypeOperator) {
        this.prototypeKuduInputOperator = prototypeOperator;
    }

    public List<KuduScanToken> getKuduScanTokensForSelectAllColumns() throws Exception {
        ApexKuduConnection apexKuduConnection = this.prototypeKuduInputOperator.getApexKuduConnectionInfo().build();
        KuduClient clientHandle = apexKuduConnection.getKuduClient();
        KuduTable table = apexKuduConnection.getKuduTable();
        KuduScanToken.KuduScanTokenBuilder builder = clientHandle.newScanTokenBuilder(table);
        ArrayList<String> allColumns = new ArrayList<String>();
        List columnList = apexKuduConnection.getKuduTable().getSchema().getColumns();
        for (ColumnSchema column : columnList) {
            allColumns.add(column.getName());
        }
        builder.setProjectedColumnNames(allColumns);
        LOG.debug("Building the partition pie assignments for the input operator");
        List allPossibleTokens = builder.build();
        apexKuduConnection.close();
        return allPossibleTokens;
    }

    public int getNumberOfPartitions(Partitioner.PartitioningContext context) {
        int proposedPartitionCount = context.getParallelPartitionCount();
        if (this.prototypeKuduInputOperator.getNumberOfPartitions() != -1) {
            proposedPartitionCount = this.prototypeKuduInputOperator.getNumberOfPartitions();
            LOG.info(" Set the partition count based on the code as opposed to configuration ");
        }
        if (proposedPartitionCount <= 0) {
            LOG.info(" Defaulting to one partition as parallel partitioning is not enabled");
            proposedPartitionCount = 1;
        }
        LOG.info(" Planning to use " + proposedPartitionCount + " partitions");
        return proposedPartitionCount;
    }

    public List<KuduPartitionScanAssignmentMeta> getListOfPartitionAssignments(Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitions, Partitioner.PartitioningContext context) throws Exception {
        ArrayList<KuduPartitionScanAssignmentMeta> returnList = new ArrayList<KuduPartitionScanAssignmentMeta>();
        ArrayList<KuduScanToken> allColumnsScanTokens = new ArrayList<KuduScanToken>();
        try {
            allColumnsScanTokens.addAll(this.getKuduScanTokensForSelectAllColumns());
        }
        catch (Exception e) {
            LOG.error(" Error while calculating the number of scan tokens for all column projections " + e.getMessage(), (Throwable)e);
        }
        if (allColumnsScanTokens.size() == 0) {
            LOG.error("No column information could be extracted from the Kudu table");
            throw new Exception("No column information could be extracted from the Kudu table");
        }
        int totalPartitionCount = allColumnsScanTokens.size();
        LOG.info("Determined maximum as " + totalPartitionCount + " tablets for this table");
        for (int i = 0; i < totalPartitionCount; ++i) {
            KuduPartitionScanAssignmentMeta aMeta = new KuduPartitionScanAssignmentMeta();
            aMeta.setOrdinal(i);
            aMeta.setTotalSize(totalPartitionCount);
            returnList.add(aMeta);
            LOG.info("A planned scan meta of the total partitions " + aMeta);
        }
        LOG.info("Total kudu partition size is " + returnList.size());
        return returnList;
    }

    public Partitioner.Partition<AbstractKuduInputOperator> clonePartitionAndAssignScanMeta(List<KuduPartitionScanAssignmentMeta> scanAssignmentsForThisPartition) {
        DefaultPartition clonedKuduInputOperator = new DefaultPartition(KryoCloneUtils.cloneObject((Object)this.prototypeKuduInputOperator));
        ((AbstractKuduInputOperator)clonedKuduInputOperator.getPartitionedInstance()).setPartitionPieAssignment(scanAssignmentsForThisPartition);
        return clonedKuduInputOperator;
    }

    public Collection<Partitioner.Partition<AbstractKuduInputOperator>> definePartitions(Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitions, Partitioner.PartitioningContext context) {
        if (partitions != null) {
            LOG.info("The current partitioner plan has " + partitions.size() + " operators before redefining");
        }
        ArrayList<Partitioner.Partition<AbstractKuduInputOperator>> partitionsForInputOperator = new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>();
        ArrayList<KuduPartitionScanAssignmentMeta> assignmentMetaList = new ArrayList<KuduPartitionScanAssignmentMeta>();
        try {
            assignmentMetaList.addAll(this.getListOfPartitionAssignments(partitions, context));
        }
        catch (Exception e) {
            throw new RuntimeException("Aborting partition planning as Kudu meta data could not be obtained", e);
        }
        LOG.info("Maximum possible Kudu input operator partition count is " + assignmentMetaList.size());
        Map<Integer, List<KuduPartitionScanAssignmentMeta>> assignments = this.assign(assignmentMetaList, context);
        boolean requiresRepartitioning = false;
        if (partitions == null) {
            requiresRepartitioning = true;
        } else {
            for (Partitioner.Partition<AbstractKuduInputOperator> aPartition : partitions) {
                if (((AbstractKuduInputOperator)aPartition.getPartitionedInstance()).isPartitioned()) continue;
                requiresRepartitioning = true;
                break;
            }
        }
        if (requiresRepartitioning) {
            partitions.clear();
            LOG.info("Clearing all of the current partitions and setting up new ones");
            partitions.clear();
            for (int i = 0; i < assignments.size(); ++i) {
                List<KuduPartitionScanAssignmentMeta> assignmentForThisOperator = assignments.get(i);
                partitionsForInputOperator.add(this.clonePartitionAndAssignScanMeta(assignmentForThisOperator));
                LOG.info("Assigned apex operator " + i + " with " + assignmentForThisOperator.size() + " kudu mappings");
            }
            LOG.info("Returning " + partitionsForInputOperator.size() + " partitions for the input operator");
            return partitionsForInputOperator;
        }
        LOG.info("Not making any changes to the partitions");
        return partitions;
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractKuduInputOperator>> partitions) {
    }

    public abstract Map<Integer, List<KuduPartitionScanAssignmentMeta>> assign(List<KuduPartitionScanAssignmentMeta> var1, Partitioner.PartitioningContext var2);

    public AbstractKuduInputOperator getPrototypeKuduInputOperator() {
        return this.prototypeKuduInputOperator;
    }

    public void setPrototypeKuduInputOperator(AbstractKuduInputOperator prototypeKuduInputOperator) {
        this.prototypeKuduInputOperator = prototypeKuduInputOperator;
    }
}

