/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.hadoop;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.carbondata.core.datastore.DataRefNode;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.BlockletInfos;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CacheClient;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonPathFilter;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.BlockLevelTraverser;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.StringUtils;

public class CarbonInputFormat<T>
extends FileInputFormat<Void, T> {
    public static final String INPUT_SEGMENT_NUMBERS = "mapreduce.input.carboninputformat.segmentnumbers";
    private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class);
    private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate";
    private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
    private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
    private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";

    public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) throws IOException {
        if (null != carbonTable) {
            configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
        }
    }

    public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
        String carbonTableStr = configuration.get(CARBON_TABLE);
        if (carbonTableStr == null) {
            CarbonInputFormat.populateCarbonTable(configuration);
            carbonTableStr = configuration.get(CARBON_TABLE);
            return (CarbonTable)ObjectSerializationUtil.convertStringToObject(carbonTableStr);
        }
        return (CarbonTable)ObjectSerializationUtil.convertStringToObject(carbonTableStr);
    }

    private static void populateCarbonTable(Configuration configuration) throws IOException {
        String dirs = configuration.get("mapreduce.input.fileinputformat.inputdir", "");
        String[] inputPaths = StringUtils.split((String)dirs);
        if (inputPaths.length == 0) {
            throw new InvalidPathException("No input paths specified in job");
        }
        AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath((String)inputPaths[0]);
        CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
        CarbonInputFormat.setCarbonTable(configuration, carbonTable);
    }

    public static void setTablePath(Configuration configuration, String tablePath) throws IOException {
        configuration.set("mapreduce.input.fileinputformat.inputdir", tablePath);
    }

    public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
        if (filterExpression == null) {
            return;
        }
        try {
            String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
            configuration.set(FILTER_PREDICATE, filterString);
        }
        catch (Exception e) {
            throw new RuntimeException("Error while setting filter expression to Job", e);
        }
    }

    public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
        if (projection == null || projection.isEmpty()) {
            return;
        }
        String[] allColumns = projection.getAllColumns();
        StringBuilder builder = new StringBuilder();
        for (String column : allColumns) {
            builder.append(column).append(",");
        }
        String columnString = builder.toString();
        columnString = columnString.substring(0, columnString.length() - 1);
        configuration.set(COLUMN_PROJECTION, columnString);
    }

    public static String getColumnProjection(Configuration configuration) {
        return configuration.get(COLUMN_PROJECTION);
    }

    public static void setCarbonReadSupport(Configuration configuration, Class<? extends CarbonReadSupport> readSupportClass) {
        if (readSupportClass != null) {
            configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
        }
    }

    private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
        return CarbonStorePath.getCarbonTablePath((AbsoluteTableIdentifier)absIdentifier);
    }

    public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
        configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
    }

    private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) throws IOException {
        return CarbonInputFormat.getCarbonTable(configuration).getAbsoluteTableIdentifier();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        AbsoluteTableIdentifier identifier = CarbonInputFormat.getAbsoluteTableIdentifier(job.getConfiguration());
        try (CacheClient cacheClient = new CacheClient(identifier.getStorePath());){
            ArrayList<String> invalidSegments = new ArrayList<String>();
            ArrayList<UpdateVO> invalidTimestampsList = new ArrayList<UpdateVO>();
            if (this.getSegmentsToAccess(job).length == 0) {
                SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
                SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments();
                SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
                CarbonInputFormat.setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments());
                if (segments.getValidSegments().size() == 0) {
                    ArrayList<InputSplit> arrayList = new ArrayList<InputSplit>(0);
                    return arrayList;
                }
                invalidSegments.addAll(segments.getInvalidSegments());
                for (String invalidSegmentId : invalidSegments) {
                    invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
                }
                if (invalidSegments.size() > 0) {
                    ArrayList<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<TableSegmentUniqueIdentifier>(invalidSegments.size());
                    for (String segId : invalidSegments) {
                        invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
                    }
                    cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
                }
            }
            Expression filter = this.getFilterPredicates(job.getConfiguration());
            CarbonTable carbonTable = CarbonInputFormat.getCarbonTable(job.getConfiguration());
            if (null == carbonTable) {
                throw new IOException("Missing/Corrupt schema file for table.");
            }
            CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
            FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
            List<InputSplit> splits = this.getSplits(job, filterInterface, cacheClient);
            if (invalidSegments.size() > 0) {
                for (InputSplit split : splits) {
                    ((CarbonInputSplit)split).setInvalidSegments(invalidSegments);
                    ((CarbonInputSplit)split).setInvalidTimestampRange(invalidTimestampsList);
                }
            }
            Iterator iterator = splits;
            return iterator;
        }
    }

    private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
        List splits = super.getSplits(job);
        ArrayList<InputSplit> carbonSplits = new ArrayList<InputSplit>(splits.size());
        for (InputSplit inputSplit : splits) {
            FileSplit fileSplit = (FileSplit)inputSplit;
            String segmentId = CarbonTablePath.DataPathUtil.getSegmentId((String)fileSplit.getPath().toString());
            if (segmentId.equals("-1")) continue;
            carbonSplits.add((InputSplit)CarbonInputSplit.from(segmentId, fileSplit, ColumnarFormatVersion.valueOf((String)"V3")));
        }
        return carbonSplits;
    }

    private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver, CacheClient cacheClient) throws IOException {
        LinkedList<InputSplit> result = new LinkedList<InputSplit>();
        FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
        AbsoluteTableIdentifier absoluteTableIdentifier = CarbonInputFormat.getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
        SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
        for (String segmentNo : this.getSegmentsToAccess(job)) {
            List<DataRefNode> dataRefNodes = this.getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, filterResolver, segmentNo, cacheClient, updateStatusManager);
            for (DataRefNode dataRefNode : dataRefNodes) {
                BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode)dataRefNode;
                TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
                if (CarbonUtil.isInvalidTableBlock((TableBlockInfo)tableBlockInfo, (UpdateVO)updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), (SegmentUpdateStatusManager)updateStatusManager)) continue;
                result.add((InputSplit)new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), tableBlockInfo.getVersion()));
            }
        }
        return result;
    }

    private Expression getFilterPredicates(Configuration configuration) {
        try {
            String filterExprString = configuration.get(FILTER_PREDICATE);
            if (filterExprString == null) {
                return null;
            }
            Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
            return (Expression)filter;
        }
        catch (IOException e) {
            throw new RuntimeException("Error while reading filter expression", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<DataRefNode> getDataBlocksOfSegment(JobContext job, FilterExpressionProcessor filterExpressionProcessor, AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, String segmentId, CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
        try {
            QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
            QueryStatistic statistic = new QueryStatistic();
            segmentIndexMap = this.getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient, updateStatusManager);
            LinkedList resultFilterredBlocks = new LinkedList();
            if (null != segmentIndexMap) {
                for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
                    List filterredBlocks = null == resolver ? this.getDataBlocksOfIndex(abstractIndex) : filterExpressionProcessor.getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex, absoluteTableIdentifier);
                    resultFilterredBlocks.addAll(filterredBlocks);
                }
            }
            statistic.addStatistics("Time taken to load the Block(s) In Driver Side", System.currentTimeMillis());
            recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
            LinkedList linkedList = resultFilterredBlocks;
            return linkedList;
        }
        finally {
            if (null != segmentIndexMap) {
                ArrayList<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers = new ArrayList<TableSegmentUniqueIdentifier>(1);
                tableSegmentUniqueIdentifiers.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId));
                cacheClient.getSegmentAccessClient().clearAccessCount(tableSegmentUniqueIdentifiers);
            }
        }
    }

    private List<TableBlockInfo> getTableBlockInfo(JobContext job, TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier, Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId, Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys) throws IOException {
        ArrayList<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
        JobContextImpl newJob = new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
        newJob.getConfiguration().set(INPUT_SEGMENT_NUMBERS, tableSegmentUniqueIdentifier.getSegmentId() + "");
        for (InputSplit inputSplit : this.getSplitsInternal((JobContext)newJob)) {
            CarbonInputSplit carbonInputSplit = (CarbonInputSplit)inputSplit;
            if (!this.isValidBlockBasedOnUpdateDetails(taskKeys, carbonInputSplit, updateDetails, updateStatusManager, segmentId, validTaskKeys)) continue;
            BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0, carbonInputSplit.getNumberOfBlocklets());
            tableBlockInfoList.add(new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(), carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(), carbonInputSplit.getBlockStorageIdMap()));
        }
        return tableBlockInfoList;
    }

    private boolean isValidBlockBasedOnUpdateDetails(Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit, UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId, Set<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys) {
        String taskID = null;
        if (null != carbonInputSplit) {
            if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
                return false;
            }
            if (null == taskKeys) {
                return true;
            }
            taskID = CarbonTablePath.DataFileUtil.getTaskNo((String)carbonInputSplit.getPath().getName());
            String bucketNo = CarbonTablePath.DataFileUtil.getBucketNo((String)carbonInputSplit.getPath().getName());
            SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder = new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
            validTaskKeys.add(taskBucketHolder);
            String blockTimestamp = carbonInputSplit.getPath().getName().substring(carbonInputSplit.getPath().getName().lastIndexOf(45) + 1, carbonInputSplit.getPath().getName().lastIndexOf(46));
            if (!(updateDetails.getUpdateDeltaStartTimestamp() != null && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp() || taskKeys.contains(taskBucketHolder))) {
                return true;
            }
        }
        return false;
    }

    private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId, CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
        Map segmentIndexMap = null;
        SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
        boolean isSegmentUpdated = false;
        Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
        TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
        segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
        UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
        if (null != segmentTaskIndexWrapper) {
            segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
            if (this.isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
                taskKeys = segmentIndexMap.keySet();
                isSegmentUpdated = true;
            }
        }
        if (segmentIndexMap == null || isSegmentUpdated) {
            HashSet<SegmentTaskIndexStore.TaskBucketHolder> validTaskKeys = new HashSet<SegmentTaskIndexStore.TaskBucketHolder>(16);
            List<TableBlockInfo> tableBlockInfoList = this.getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys, updateStatusManager.getInvalidTimestampRange(segmentId), updateStatusManager, segmentId, validTaskKeys);
            if (!tableBlockInfoList.isEmpty()) {
                HashMap<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<String, List<TableBlockInfo>>();
                segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
                tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
                tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
                segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
                segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
            }
            if (null != taskKeys) {
                HashMap finalMap = new HashMap(validTaskKeys.size());
                for (SegmentTaskIndexStore.TaskBucketHolder key : validTaskKeys) {
                    finalMap.put(key, segmentIndexMap.get(key));
                }
                segmentIndexMap = finalMap;
            }
        }
        return segmentIndexMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException, KeyGenException {
        try (CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());){
            SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
            SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
            HashMap<String, Long> blockRowCountMapping = new HashMap<String, Long>(16);
            HashMap<String, Long> segmentAndBlockCountMapping = new HashMap<String, Long>(16);
            for (String eachValidSeg : validAndInvalidSegments.getValidSegments()) {
                long countOfBlocksInSeg = 0L;
                Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskAbstractIndexMap = this.getSegmentAbstractIndexs(job, absoluteTableIdentifier, eachValidSeg, cacheClient, updateStatusManager);
                for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskMap : taskAbstractIndexMap.entrySet()) {
                    AbstractIndex taskAbstractIndex = taskMap.getValue();
                    countOfBlocksInSeg += new BlockLevelTraverser().getBlockRowMapping(taskAbstractIndex, blockRowCountMapping, eachValidSeg, updateStatusManager);
                }
                segmentAndBlockCountMapping.put(eachValidSeg, countOfBlocksInSeg);
            }
            BlockMappingVO blockMappingVO = new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
            return blockMappingVO;
        }
    }

    private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper, UpdateVO updateDetails) {
        return null != updateDetails.getLatestUpdateTimestamp() && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper.getRefreshedTimeStamp();
    }

    private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
        LinkedList<DataRefNode> blocks = new LinkedList<DataRefNode>();
        SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
        try {
            IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey((SegmentProperties)segmentProperties);
            IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey((SegmentProperties)segmentProperties);
            BTreeDataRefNodeFinder blockFinder = new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
            DataRefNode endBlock = blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
            for (DataRefNode startBlock = blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey); startBlock != endBlock; startBlock = startBlock.getNextDataRefNode()) {
                blocks.add(startBlock);
            }
            blocks.add(endBlock);
        }
        catch (KeyGenException e) {
            LOG.error((Object)"Could not generate start key", (Throwable)e);
        }
        return blocks;
    }

    public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Configuration configuration = taskAttemptContext.getConfiguration();
        QueryModel queryModel = this.getQueryModel(inputSplit, taskAttemptContext);
        CarbonReadSupport<T> readSupport = this.getReadSupportClass(configuration);
        return new CarbonRecordReader<T>(queryModel, readSupport);
    }

    public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        Configuration configuration = taskAttemptContext.getConfiguration();
        CarbonTable carbonTable = CarbonInputFormat.getCarbonTable(configuration);
        AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
        String projection = CarbonInputFormat.getColumnProjection(configuration);
        CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
        QueryModel queryModel = QueryModel.createModel((AbsoluteTableIdentifier)identifier, (CarbonQueryPlan)queryPlan, (CarbonTable)carbonTable);
        Expression filter = this.getFilterPredicates(configuration);
        CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
        FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
        queryModel.setFilterExpressionResolverTree(filterIntf);
        if (inputSplit instanceof CarbonMultiBlockSplit) {
            List<UpdateVO> invalidTimestampRangeList;
            CarbonMultiBlockSplit split = (CarbonMultiBlockSplit)inputSplit;
            List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
            if (invalidSegments.size() > 0) {
                queryModel.setInvalidSegmentIds(invalidSegments);
            }
            if (null != (invalidTimestampRangeList = split.getAllSplits().get(0).getInvalidTimestampRange()) && invalidTimestampRangeList.size() > 0) {
                queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
            }
        }
        return queryModel;
    }

    public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
        String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
        CarbonReadSupport readSupport = null;
        if (readSupportClass != null) {
            try {
                Class<?> myClass = Class.forName(readSupportClass);
                Constructor<?> constructor = myClass.getConstructors()[0];
                Object object = constructor.newInstance(new Object[0]);
                if (object instanceof CarbonReadSupport) {
                    readSupport = (CarbonReadSupport)object;
                }
            }
            catch (ClassNotFoundException ex) {
                LOG.error((Object)("Class " + readSupportClass + "not found"), (Throwable)ex);
            }
            catch (Exception ex) {
                LOG.error((Object)("Error while creating " + readSupportClass), (Throwable)ex);
            }
        } else {
            readSupport = new DictionaryDecodeReadSupport();
        }
        return readSupport;
    }

    protected List<FileStatus> listStatus(JobContext job) throws IOException {
        ArrayList<FileStatus> result = new ArrayList<FileStatus>();
        String[] segmentsToConsider = this.getSegmentsToAccess(job);
        if (segmentsToConsider.length == 0) {
            throw new IOException("No segments found");
        }
        this.getFileStatusOfSegments(job, segmentsToConsider, result);
        return result;
    }

    protected boolean isSplitable(JobContext context, Path filename) {
        try {
            FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
            if (fileSystem instanceof LocalFileSystem) {
                return false;
            }
        }
        catch (Exception e) {
            return true;
        }
        return true;
    }

    private void getFileStatusOfSegments(JobContext job, String[] segmentsToConsider, List<FileStatus> result) throws IOException {
        String[] partitionsToConsider = this.getValidPartitions(job);
        if (partitionsToConsider.length == 0) {
            throw new IOException("No partitions/data found");
        }
        PathFilter inputFilter = this.getDataFileFilter();
        AbsoluteTableIdentifier absIdentifier = CarbonInputFormat.getAbsoluteTableIdentifier(job.getConfiguration());
        CarbonTablePath tablePath = CarbonInputFormat.getTablePath(absIdentifier);
        TokenCache.obtainTokensForNamenodes((Credentials)job.getCredentials(), (Path[])new Path[]{tablePath}, (Configuration)job.getConfiguration());
        for (int i = 0; i < partitionsToConsider.length; ++i) {
            String partition = partitionsToConsider[i];
            for (int j = 0; j < segmentsToConsider.length; ++j) {
                String segmentId = segmentsToConsider[j];
                String dataDirectoryPath = absIdentifier.appendWithLocalPrefix(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
                Path segmentPath = new Path(dataDirectoryPath);
                FileSystem fs = segmentPath.getFileSystem(job.getConfiguration());
                RemoteIterator iter = fs.listLocatedStatus(segmentPath);
                while (iter.hasNext()) {
                    LocatedFileStatus stat = (LocatedFileStatus)iter.next();
                    if (!inputFilter.accept(stat.getPath())) continue;
                    if (stat.isDirectory()) {
                        this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
                        continue;
                    }
                    result.add((FileStatus)stat);
                }
            }
        }
    }

    private PathFilter getDataFileFilter() {
        return new CarbonPathFilter(this.getUpdateExtension());
    }

    private String getUpdateExtension() {
        return "update";
    }

    private String[] getSegmentsToAccess(JobContext job) {
        String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
        if (segmentString.trim().isEmpty()) {
            return new String[0];
        }
        return segmentString.split(",");
    }

    private String[] getValidPartitions(JobContext job) {
        return new String[]{"0"};
    }
}

