/*
 * Decompiled with CFR 0.152.
 */
package co.cask.hydrator.format.plugin;

import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchContext;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.hydrator.common.LineageRecorder;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.common.batch.JobUtils;
import co.cask.hydrator.format.FileFormat;
import co.cask.hydrator.format.RegexPathFilter;
import co.cask.hydrator.format.input.CombinePathTrackingInputFormat;
import co.cask.hydrator.format.input.EmptyInputFormat;
import co.cask.hydrator.format.input.PathTrackingInputFormat;
import co.cask.hydrator.format.plugin.FileSourceProperties;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public abstract class AbstractFileSource<T extends PluginConfig>
extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
    private final T config;

    protected AbstractFileSource(T config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        String pathField;
        ((FileSourceProperties)this.config).validate();
        Schema schema = ((FileSourceProperties)this.config).getSchema();
        FileFormat fileFormat = ((FileSourceProperties)this.config).getFormat();
        if (fileFormat != null) {
            fileFormat.getFileInputFormatter(this.config.getProperties().getProperties(), schema);
        }
        if ((pathField = ((FileSourceProperties)this.config).getPathField()) != null && schema != null) {
            Schema.Type pathFieldType;
            Schema.Field schemaPathField = schema.getField(pathField);
            if (schemaPathField == null) {
                throw new IllegalArgumentException(String.format("Path field '%s' is not present in the schema. Please add it to the schema as a string field.", pathField));
            }
            Schema pathFieldSchema = schemaPathField.getSchema();
            Schema.Type type = pathFieldType = pathFieldSchema.isNullable() ? pathFieldSchema.getNonNullable().getType() : pathFieldSchema.getType();
            if (pathFieldType != Schema.Type.STRING) {
                throw new IllegalArgumentException(String.format("Path field '%s' must be of type 'string', but found '%s'.", pathField, pathFieldType));
            }
        }
        pipelineConfigurer.getStageConfigurer().setOutputSchema(((FileSourceProperties)this.config).getSchema());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void prepareRun(BatchSourceContext context) throws Exception {
        String inputFormatClass;
        ((FileSourceProperties)this.config).validate();
        Job job = JobUtils.createInstance();
        Configuration conf = job.getConfiguration();
        Pattern pattern = ((FileSourceProperties)this.config).getFilePattern();
        if (pattern != null) {
            RegexPathFilter.configure(conf, pattern);
            FileInputFormat.setInputPathFilter((Job)job, RegexPathFilter.class);
        }
        FileInputFormat.setInputDirRecursive((Job)job, (boolean)((FileSourceProperties)this.config).shouldReadRecursively());
        Schema schema = ((FileSourceProperties)this.config).getSchema();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, ((FileSourceProperties)this.config).getReferenceName());
        lineageRecorder.createExternalDataset(schema);
        if (schema != null && schema.getFields() != null) {
            this.recordLineage(lineageRecorder, schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
        for (Map.Entry<String, String> entry : this.getFileSystemProperties(context).entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }
        Path path = new Path(((FileSourceProperties)this.config).getPath());
        FileSystem pathFileSystem = FileSystem.get((URI)path.toUri(), (Configuration)conf);
        FileStatus[] fileStatus = pathFileSystem.globStatus(path);
        if (fileStatus == null) {
            if (!((FileSourceProperties)this.config).shouldAllowEmptyInput()) throw new IOException(String.format("Input path %s does not exist", path));
            inputFormatClass = EmptyInputFormat.class.getName();
        } else {
            FileInputFormat.addInputPath((Job)job, (Path)path);
            FileInputFormat.setMaxInputSplitSize((Job)job, (long)((FileSourceProperties)this.config).getMaxSplitSize());
            PathTrackingInputFormat.configure(job, (FileSourceProperties)this.config, this.config.getProperties().getProperties());
            FileFormat format = ((FileSourceProperties)this.config).getFormat();
            inputFormatClass = format == FileFormat.BLOB ? PathTrackingInputFormat.class.getName() : CombinePathTrackingInputFormat.class.getName();
        }
        for (Map.Entry<String, String> entry : this.getFileSystemProperties(context).entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }
        context.setInput(Input.of((String)((FileSourceProperties)this.config).getReferenceName(), (InputFormatProvider)new SourceInputFormatProvider(inputFormatClass, conf)));
    }

    public void transform(KeyValue<NullWritable, StructuredRecord> input, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(input.getValue());
    }

    protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
        return Collections.emptyMap();
    }

    protected void recordLineage(LineageRecorder lineageRecorder, List<String> outputFields) {
        lineageRecorder.recordRead("Read", String.format("Read from %s files.", ((FileSourceProperties)this.config).getFormat().name().toLowerCase()), outputFields);
    }
}

