/*
 * Decompiled with CFR 0.152.
 */
package co.cask.hydrator.format.plugin;

import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchContext;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSinkContext;
import co.cask.hydrator.common.LineageRecorder;
import co.cask.hydrator.common.batch.sink.SinkOutputFormatProvider;
import co.cask.hydrator.format.output.FileOutputFormatter;
import co.cask.hydrator.format.plugin.FileSinkProperties;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class AbstractFileSink<T extends PluginConfig>
extends BatchSink<StructuredRecord, Object, Object> {
    private final T config;
    private FileOutputFormatter<Object, Object> outputFormatter;

    protected AbstractFileSink(T config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        ((FileSinkProperties)this.config).validate();
    }

    public final void prepareRun(BatchSinkContext context) {
        ((FileSinkProperties)this.config).validate();
        this.outputFormatter = ((FileSinkProperties)this.config).getFormat().getFileOutputFormatter(this.config.getProperties().getProperties(), ((FileSinkProperties)this.config).getSchema());
        if (this.outputFormatter == null) {
            throw new IllegalArgumentException(String.format("Format '%s' cannot be used to write data.", new Object[]{((FileSinkProperties)this.config).getFormat()}));
        }
        Schema schema = ((FileSinkProperties)this.config).getSchema();
        if (schema == null) {
            schema = context.getInputSchema();
        }
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, ((FileSinkProperties)this.config).getReferenceName());
        lineageRecorder.createExternalDataset(schema);
        if (schema != null && schema.getFields() != null && !schema.getFields().isEmpty()) {
            this.recordLineage(lineageRecorder, schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
        HashMap<String, String> outputProperties = new HashMap<String, String>(this.outputFormatter.getFormatConfig());
        outputProperties.putAll(this.getFileSystemProperties(context));
        outputProperties.put("mapreduce.output.fileoutputformat.outputdir", this.getOutputDir(context.getLogicalStartTime()));
        context.addOutput(Output.of((String)((FileSinkProperties)this.config).getReferenceName(), (OutputFormatProvider)new SinkOutputFormatProvider(this.outputFormatter.getFormatClassName(), outputProperties)));
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        this.outputFormatter = ((FileSinkProperties)this.config).getFormat().getFileOutputFormatter(this.config.getProperties().getProperties(), ((FileSinkProperties)this.config).getSchema());
    }

    public void transform(StructuredRecord input, Emitter<KeyValue<Object, Object>> emitter) throws Exception {
        emitter.emit(this.outputFormatter.transform(input));
    }

    protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
        return Collections.emptyMap();
    }

    protected void recordLineage(LineageRecorder lineageRecorder, List<String> outputFields) {
        lineageRecorder.recordWrite("Write", String.format("Wrote to %s files.", ((FileSinkProperties)this.config).getFormat().name().toLowerCase()), outputFields);
    }

    private String getOutputDir(long logicalStartTime) {
        String suffix = ((FileSinkProperties)this.config).getSuffix();
        String timeSuffix = suffix == null || suffix.isEmpty() ? "" : new SimpleDateFormat(suffix).format(logicalStartTime);
        return String.format("%s/%s", ((FileSinkProperties)this.config).getPath(), timeSuffix);
    }
}

