/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.hadoop;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.datastore.DataRefNode;
import org.apache.carbondata.core.carbon.datastore.IndexKey;
import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockBTreeLeafNode;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.carbon.path.CarbonStorePath;
import org.apache.carbondata.core.carbon.path.CarbonTablePath;
import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic;
import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonPathFilter;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.carbondata.lcm.status.SegmentStatusManager;
import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.scan.expression.Expression;
import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.scan.filter.FilterUtil;
import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.scan.model.CarbonQueryPlan;
import org.apache.carbondata.scan.model.QueryModel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.StringUtils;

public class CarbonInputFormat<T>
extends FileInputFormat<Void, T> {
    public static final String INPUT_SEGMENT_NUMBERS = "mapreduce.input.carboninputformat.segmentnumbers";
    private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class);
    private static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databasename";
    private static final String TABLE_NAME = "mapreduce.input.carboninputformat.tablename";
    private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate";
    private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
    private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
    private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
    private static final String TABLE_ID = "mapreduce.input.carboninputformat.tableId";

    public static void setTableToAccess(Configuration configuration, CarbonTableIdentifier tableIdentifier) {
        configuration.set(DATABASE_NAME, tableIdentifier.getDatabaseName());
        configuration.set(TABLE_NAME, tableIdentifier.getTableName());
        configuration.set(TABLE_ID, tableIdentifier.getTableId());
    }

    public static CarbonTableIdentifier getTableToAccess(Configuration configuration) {
        String databaseName = configuration.get(DATABASE_NAME);
        String tableName = configuration.get(TABLE_NAME);
        String tableId = configuration.get(TABLE_ID);
        if (databaseName != null && tableName != null) {
            return new CarbonTableIdentifier(databaseName, tableName, tableId);
        }
        return null;
    }

    public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) throws IOException {
        if (null != carbonTable) {
            configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
        }
    }

    public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
        String carbonTableStr = configuration.get(CARBON_TABLE);
        if (carbonTableStr == null) {
            CarbonTable carbonTable = new SchemaReader().readCarbonTableFromStore(CarbonInputFormat.getTablePath(configuration), CarbonInputFormat.getTableToAccess(configuration), CarbonInputFormat.getStorePathString(configuration));
            CarbonInputFormat.setCarbonTable(configuration, carbonTable);
            return carbonTable;
        }
        return (CarbonTable)ObjectSerializationUtil.convertStringToObject(carbonTableStr);
    }

    public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
        try {
            String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
            configuration.set(FILTER_PREDICATE, filterString);
        }
        catch (Exception e) {
            throw new RuntimeException("Error while setting filter expression to Job", e);
        }
    }

    public static void setFilterPredicates(Configuration configuration, FilterResolverIntf filterExpression) {
        try {
            if (filterExpression == null) {
                return;
            }
            String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
            configuration.set(FILTER_PREDICATE, filterString);
        }
        catch (Exception e) {
            throw new RuntimeException("Error while setting filter expression to Job", e);
        }
    }

    public static void setColumnProjection(CarbonProjection projection, Configuration configuration) {
        if (projection == null || projection.isEmpty()) {
            return;
        }
        String[] allColumns = projection.getAllColumns();
        StringBuilder builder = new StringBuilder();
        for (String column : allColumns) {
            builder.append(column).append(",");
        }
        String columnString = builder.toString();
        columnString = columnString.substring(0, columnString.length() - 1);
        configuration.set(COLUMN_PROJECTION, columnString);
    }

    public static void setCarbonReadSupport(Class<? extends CarbonReadSupport> readSupportClass, Configuration configuration) {
        if (readSupportClass != null) {
            configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
        }
    }

    public static CarbonTablePath getTablePath(Configuration configuration) throws IOException {
        String storePathString = CarbonInputFormat.getStorePathString(configuration);
        CarbonTableIdentifier tableIdentifier = CarbonInputFormat.getTableToAccess(configuration);
        if (tableIdentifier == null) {
            throw new IOException("Could not find mapreduce.input.carboninputformat.databasename,mapreduce.input.carboninputformat.tablename");
        }
        return CarbonStorePath.getCarbonTablePath((String)storePathString, (CarbonTableIdentifier)tableIdentifier);
    }

    private static String getStorePathString(Configuration configuration) throws IOException {
        String dirs = configuration.get("mapreduce.input.fileinputformat.inputdir", "");
        String[] inputPaths = StringUtils.split((String)dirs);
        if (inputPaths.length == 0) {
            throw new IOException("No input paths specified in job");
        }
        return CarbonInputFormatUtil.processPath(inputPaths[0]);
    }

    public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
        configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
    }

    private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
        if (this.getSegmentsFromConfiguration(job).length == 0) {
            SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
            CarbonInputFormat.setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments());
        }
    }

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        try {
            CarbonTable carbonTable = CarbonInputFormat.getCarbonTable(job.getConfiguration());
            Object filterPredicates = this.getFilterPredicates(job.getConfiguration());
            AbsoluteTableIdentifier absoluteTableIdentifier = this.getAbsoluteTableIdentifier(job.getConfiguration());
            this.addSegmentsIfEmpty(job, absoluteTableIdentifier);
            if (filterPredicates == null) {
                return this.getSplitsNonFilter(job);
            }
            if (filterPredicates instanceof Expression) {
                CarbonInputFormatUtil.processFilterExpression((Expression)filterPredicates, carbonTable);
                return this.getSplits(job, CarbonInputFormatUtil.resolveFilter((Expression)filterPredicates, absoluteTableIdentifier));
            }
            return this.getSplits(job, (FilterResolverIntf)filterPredicates);
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    private List<InputSplit> getSplitsNonFilter(JobContext job) throws IOException, IndexBuilderException {
        return this.getSplits(job, null);
    }

    private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
        List splits = super.getSplits(job);
        ArrayList<InputSplit> carbonSplits = new ArrayList<InputSplit>(splits.size());
        for (InputSplit inputSplit : splits) {
            FileSplit fileSplit = (FileSplit)inputSplit;
            String segmentId = CarbonTablePath.DataPathUtil.getSegmentId((String)fileSplit.getPath().toString());
            if ("-1" == segmentId) continue;
            carbonSplits.add((InputSplit)CarbonInputSplit.from(segmentId, fileSplit));
        }
        return carbonSplits;
    }

    private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver) throws IOException, IndexBuilderException {
        LinkedList<InputSplit> result = new LinkedList<InputSplit>();
        FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
        AbsoluteTableIdentifier absoluteTableIdentifier = this.getAbsoluteTableIdentifier(job.getConfiguration());
        for (String segmentNo : this.getSegmentsFromConfiguration(job)) {
            List<DataRefNode> dataRefNodes = this.getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, filterResolver, segmentNo);
            for (DataRefNode dataRefNode : dataRefNodes) {
                BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode)dataRefNode;
                TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
                result.add((InputSplit)new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets()));
            }
        }
        return result;
    }

    public long getRowCount(JobContext job) throws IOException, IndexBuilderException {
        long rowCount = 0L;
        AbsoluteTableIdentifier absoluteTableIdentifier = this.getAbsoluteTableIdentifier(job.getConfiguration());
        this.addSegmentsIfEmpty(job, absoluteTableIdentifier);
        int numberOfCores = 10;
        try {
            numberOfCores = Integer.parseInt(CarbonProperties.getInstance().getProperty("no.of.cores.to.load.blocks.in.driver"));
        }
        catch (NumberFormatException e) {
            numberOfCores = 10;
        }
        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
        ArrayList<Future<Map<String, AbstractIndex>>> loadedBlocks = new ArrayList<Future<Map<String, AbstractIndex>>>();
        for (String segmentNo : this.getSegmentsFromConfiguration(job)) {
            loadedBlocks.add(threadPool.submit(new BlocksLoaderThread(job, absoluteTableIdentifier, segmentNo)));
        }
        threadPool.shutdown();
        try {
            threadPool.awaitTermination(1L, TimeUnit.HOURS);
        }
        catch (InterruptedException e) {
            throw new IndexBuilderException((Throwable)e);
        }
        try {
            for (Future future : loadedBlocks) {
                for (AbstractIndex abstractIndex : ((Map)future.get()).values()) {
                    rowCount += abstractIndex.getTotalNumberOfRows();
                }
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IndexBuilderException((Throwable)e);
        }
        return rowCount;
    }

    public FilterResolverIntf getResolvedFilter(Configuration configuration, Expression filterExpression) throws IOException, IndexBuilderException, QueryExecutionException {
        if (filterExpression == null) {
            return null;
        }
        FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
        AbsoluteTableIdentifier absoluteTableIdentifier = this.getAbsoluteTableIdentifier(configuration);
        try {
            return filterExpressionProcessor.getFilterResolver(filterExpression, absoluteTableIdentifier);
        }
        catch (FilterUnsupportedException e) {
            throw new QueryExecutionException(e.getMessage());
        }
    }

    private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) throws IOException {
        return new AbsoluteTableIdentifier(CarbonInputFormat.getStorePathString(configuration), CarbonInputFormat.getTableToAccess(configuration));
    }

    private Object getFilterPredicates(Configuration configuration) {
        try {
            String filterExprString = configuration.get(FILTER_PREDICATE);
            if (filterExprString == null) {
                return null;
            }
            Object filterExprs = ObjectSerializationUtil.convertStringToObject(filterExprString);
            return filterExprs;
        }
        catch (IOException e) {
            throw new RuntimeException("Error while reading filter expression", e);
        }
    }

    private List<DataRefNode> getDataBlocksOfSegment(JobContext job, FilterExpressionProcessor filterExpressionProcessor, AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, String segmentId) throws IndexBuilderException, IOException {
        QueryStatisticsRecorder recorder = new QueryStatisticsRecorder("");
        QueryStatistic statistic = new QueryStatistic();
        Map<String, AbstractIndex> segmentIndexMap = this.getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId);
        LinkedList<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
        for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
            List filterredBlocks = null;
            if (null == resolver) {
                filterredBlocks = this.getDataBlocksOfIndex(abstractIndex);
            } else {
                try {
                    filterredBlocks = filterExpressionProcessor.getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex, absoluteTableIdentifier);
                }
                catch (QueryExecutionException e) {
                    throw new IndexBuilderException(e.getMessage());
                }
            }
            resultFilterredBlocks.addAll(filterredBlocks);
        }
        statistic.addStatistics("Time taken to load the Block(s) In Driver Side", System.currentTimeMillis());
        recorder.recordStatistics(statistic);
        recorder.logStatistics();
        return resultFilterredBlocks;
    }

    private List<TableBlockInfo> getTableBlockInfo(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException {
        ArrayList<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
        JobContextImpl newJob = new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
        newJob.getConfiguration().set(INPUT_SEGMENT_NUMBERS, segmentId + "");
        for (InputSplit inputSplit : this.getSplitsInternal((JobContext)newJob)) {
            CarbonInputSplit carbonInputSplit = (CarbonInputSplit)inputSplit;
            BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0, carbonInputSplit.getNumberOfBlocklets());
            tableBlockInfoList.add(new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(), blockletInfos));
        }
        return tableBlockInfoList;
    }

    private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException, IndexBuilderException {
        Map segmentIndexMap = SegmentTaskIndexStore.getInstance().getSegmentBTreeIfExists(absoluteTableIdentifier, segmentId);
        if (segmentIndexMap == null) {
            List<TableBlockInfo> tableBlockInfoList = this.getTableBlockInfo(job, absoluteTableIdentifier, segmentId);
            HashMap<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<String, List<TableBlockInfo>>();
            segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
            segmentIndexMap = SegmentTaskIndexStore.getInstance().loadAndGetTaskIdToSegmentsMap(segmentToTableBlocksInfos, absoluteTableIdentifier);
        }
        return segmentIndexMap;
    }

    private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
        LinkedList<DataRefNode> blocks = new LinkedList<DataRefNode>();
        SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
        try {
            IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey((SegmentProperties)segmentProperties);
            IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey((SegmentProperties)segmentProperties);
            BTreeDataRefNodeFinder blockFinder = new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
            DataRefNode endBlock = blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
            for (DataRefNode startBlock = blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey); startBlock != endBlock; startBlock = startBlock.getNextDataRefNode()) {
                blocks.add(startBlock);
            }
            blocks.add(endBlock);
        }
        catch (KeyGenException e) {
            LOG.error((Object)"Could not generate start key", (Throwable)e);
        }
        return blocks;
    }

    public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        QueryModel queryModel;
        Configuration configuration = taskAttemptContext.getConfiguration();
        CarbonTable carbonTable = CarbonInputFormat.getCarbonTable(configuration);
        try {
            CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, configuration.get(COLUMN_PROJECTION));
            queryModel = QueryModel.createModel((AbsoluteTableIdentifier)this.getAbsoluteTableIdentifier(configuration), (CarbonQueryPlan)queryPlan, (CarbonTable)carbonTable);
            Object filterPredicates = this.getFilterPredicates(configuration);
            if (filterPredicates != null) {
                if (filterPredicates instanceof Expression) {
                    CarbonInputFormatUtil.processFilterExpression((Expression)filterPredicates, carbonTable);
                    queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil.resolveFilter((Expression)filterPredicates, this.getAbsoluteTableIdentifier(configuration)));
                } else {
                    queryModel.setFilterExpressionResolverTree((FilterResolverIntf)filterPredicates);
                }
            }
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        CarbonReadSupport readSupport = this.getReadSupportClass(configuration);
        return new CarbonRecordReader(queryModel, readSupport);
    }

    private CarbonReadSupport getReadSupportClass(Configuration configuration) {
        String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
        CarbonReadSupport<Object[]> readSupport = null;
        if (readSupportClass != null) {
            try {
                Class<?> myClass = Class.forName(readSupportClass);
                Constructor<?> constructor = myClass.getConstructors()[0];
                Object object = constructor.newInstance(new Object[0]);
                if (object instanceof CarbonReadSupport) {
                    readSupport = (CarbonReadSupport)object;
                }
            }
            catch (ClassNotFoundException ex) {
                LOG.error((Object)("Class " + readSupportClass + "not found"), (Throwable)ex);
            }
            catch (Exception ex) {
                LOG.error((Object)("Error while creating " + readSupportClass), (Throwable)ex);
            }
        } else {
            readSupport = new DictionaryDecodedReadSupportImpl();
        }
        return readSupport;
    }

    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
        return super.computeSplitSize(blockSize, minSize, maxSize);
    }

    protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
        return super.getBlockIndex(blkLocations, offset);
    }

    protected List<FileStatus> listStatus(JobContext job) throws IOException {
        ArrayList<FileStatus> result = new ArrayList<FileStatus>();
        String[] segmentsToConsider = this.getSegmentsFromConfiguration(job);
        if (segmentsToConsider.length == 0) {
            throw new IOException("No segments found");
        }
        this.getFileStatusOfSegments(job, segmentsToConsider, result);
        return result;
    }

    protected boolean isSplitable(JobContext context, Path filename) {
        try {
            FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
            if (fileSystem instanceof LocalFileSystem) {
                return false;
            }
        }
        catch (Exception e) {
            return true;
        }
        return true;
    }

    private void getFileStatusOfSegments(JobContext job, String[] segmentsToConsider, List<FileStatus> result) throws IOException {
        String[] partitionsToConsider = this.getValidPartitions(job);
        if (partitionsToConsider.length == 0) {
            throw new IOException("No partitions/data found");
        }
        PathFilter inputFilter = this.getDataFileFilter(job);
        CarbonTablePath tablePath = CarbonInputFormat.getTablePath(job.getConfiguration());
        TokenCache.obtainTokensForNamenodes((Credentials)job.getCredentials(), (Path[])new Path[]{tablePath}, (Configuration)job.getConfiguration());
        for (int i = 0; i < partitionsToConsider.length; ++i) {
            String partition = partitionsToConsider[i];
            for (int j = 0; j < segmentsToConsider.length; ++j) {
                String segmentId = segmentsToConsider[j];
                Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
                FileSystem fs = segmentPath.getFileSystem(job.getConfiguration());
                RemoteIterator iter = fs.listLocatedStatus(segmentPath);
                while (iter.hasNext()) {
                    LocatedFileStatus stat = (LocatedFileStatus)iter.next();
                    if (!inputFilter.accept(stat.getPath())) continue;
                    if (stat.isDirectory()) {
                        this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
                        continue;
                    }
                    result.add((FileStatus)stat);
                }
            }
        }
    }

    public PathFilter getDataFileFilter(JobContext job) {
        return new CarbonPathFilter(this.getUpdateExtension());
    }

    private String getUpdateExtension() {
        return "update";
    }

    private String[] getSegmentsFromConfiguration(JobContext job) throws IOException {
        int i;
        String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
        if (segmentString.trim().isEmpty()) {
            return new String[0];
        }
        String[] segments = segmentString.split(",");
        String[] segmentIds = new String[segments.length];
        try {
            for (i = 0; i < segments.length; ++i) {
                segmentIds[i] = segments[i];
            }
        }
        catch (NumberFormatException e) {
            throw new IOException("segment no:" + segments[i] + " should be integer");
        }
        return segmentIds;
    }

    private String[] getValidPartitions(JobContext job) {
        return new String[]{"0"};
    }

    private class BlocksLoaderThread
    implements Callable<Map<String, AbstractIndex>> {
        private JobContext job;
        private AbsoluteTableIdentifier absoluteTableIdentifier;
        private String segmentId;

        private BlocksLoaderThread(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) {
            this.job = job;
            this.absoluteTableIdentifier = absoluteTableIdentifier;
            this.segmentId = segmentId;
        }

        @Override
        public Map<String, AbstractIndex> call() throws Exception {
            return CarbonInputFormat.this.getSegmentAbstractIndexs(this.job, this.absoluteTableIdentifier, this.segmentId);
        }
    }
}

