/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.template.etl.batch;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.template.etl.api.Transform;
import co.cask.cdap.template.etl.api.Transformation;
import co.cask.cdap.template.etl.api.batch.BatchSink;
import co.cask.cdap.template.etl.api.batch.BatchSource;
import co.cask.cdap.template.etl.batch.BatchTransformContext;
import co.cask.cdap.template.etl.batch.MapReduceSinkContext;
import co.cask.cdap.template.etl.batch.MapReduceSourceContext;
import co.cask.cdap.template.etl.batch.config.ETLBatchConfig;
import co.cask.cdap.template.etl.common.Destroyables;
import co.cask.cdap.template.etl.common.ETLStage;
import co.cask.cdap.template.etl.common.PluginID;
import co.cask.cdap.template.etl.common.StageMetrics;
import co.cask.cdap.template.etl.common.TransformExecutor;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ETLMapReduce
extends AbstractMapReduce {
    private static final Logger LOG = LoggerFactory.getLogger(ETLMapReduce.class);
    private static final Gson GSON = new Gson();
    private BatchSource batchSource;
    private BatchSink batchSink;
    private String sourcePluginId;
    private String sinkPluginId;
    private Metrics mrMetrics;

    public void configure() {
        this.setName(ETLMapReduce.class.getSimpleName());
        this.setDescription("MapReduce driver for Batch ETL Adapters");
    }

    public void beforeSubmit(MapReduceContext context) throws Exception {
        Job job = (Job)context.getHadoopJob();
        Map runtimeArgs = context.getRuntimeArguments();
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("name"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("config"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("sourceId"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("sinkId"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("transformIds"));
        ETLBatchConfig etlBatchConfig = (ETLBatchConfig)((Object)GSON.fromJson((String)runtimeArgs.get("config"), ETLBatchConfig.class));
        this.prepareSource(context, etlBatchConfig.getSource());
        this.prepareSink(context, etlBatchConfig.getSink());
        if (etlBatchConfig.getResources() != null) {
            context.setMapperResources(etlBatchConfig.getResources());
        }
        job.setMapperClass(ETLMapper.class);
        job.setNumReduceTasks(0);
    }

    private void prepareSource(MapReduceContext context, ETLStage sourceStage) throws Exception {
        this.sourcePluginId = (String)context.getRuntimeArguments().get("sourceId");
        this.batchSource = (BatchSource)context.newPluginInstance(this.sourcePluginId);
        MapReduceSourceContext sourceContext = new MapReduceSourceContext(context, this.mrMetrics, this.sourcePluginId);
        LOG.debug("Source Stage : {}", (Object)sourceStage);
        LOG.debug("Source Class : {}", (Object)this.batchSource.getClass().getName());
        this.batchSource.prepareRun(sourceContext);
    }

    private void prepareSink(MapReduceContext context, ETLStage sinkStage) throws Exception {
        this.sinkPluginId = (String)context.getRuntimeArguments().get("sinkId");
        this.batchSink = (BatchSink)context.newPluginInstance(this.sinkPluginId);
        MapReduceSinkContext sinkContext = new MapReduceSinkContext(context, this.mrMetrics, this.sinkPluginId);
        LOG.debug("Sink Stage : {}", (Object)sinkStage);
        LOG.debug("Sink Class : {}", (Object)this.batchSink.getClass().getName());
        this.batchSink.prepareRun(sinkContext);
    }

    public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
        this.onRunFinishSource(context, succeeded);
        this.onRunFinishSink(context, succeeded);
        LOG.info("Batch Run for Adapter {} : {}", context.getRuntimeArguments().get("name"), (Object)succeeded);
    }

    private void onRunFinishSource(MapReduceContext context, boolean succeeded) {
        MapReduceSourceContext sourceContext = new MapReduceSourceContext(context, this.mrMetrics, this.sourcePluginId);
        LOG.info("On RunFinish Source : {}", (Object)this.batchSource.getClass().getName());
        try {
            this.batchSource.onRunFinish(succeeded, sourceContext);
        }
        catch (Throwable t) {
            LOG.warn("Exception when calling onRunFinish on {}", (Object)this.batchSource, (Object)t);
        }
    }

    private void onRunFinishSink(MapReduceContext context, boolean succeeded) {
        MapReduceSinkContext sinkContext = new MapReduceSinkContext(context, this.mrMetrics, this.sinkPluginId);
        LOG.info("On RunFinish Sink : {}", (Object)this.batchSink.getClass().getName());
        try {
            this.batchSink.onRunFinish(succeeded, sinkContext);
        }
        catch (Throwable t) {
            LOG.warn("Exception when calling onRunFinish on {}", (Object)this.batchSink, (Object)t);
        }
    }

    public static class ETLMapper
    extends Mapper
    implements ProgramLifecycle<MapReduceContext> {
        private static final Gson GSON = new Gson();
        private static final Type STRING_LIST_TYPE = new TypeToken<List<String>>(){}.getType();
        private List<Transform> transforms;
        private TransformExecutor<KeyValue, KeyValue> transformExecutor;
        private Metrics mapperMetrics;

        public void initialize(MapReduceContext context) throws Exception {
            Map runtimeArgs = context.getRuntimeArguments();
            ETLBatchConfig etlConfig = (ETLBatchConfig)((Object)GSON.fromJson((String)runtimeArgs.get("config"), ETLBatchConfig.class));
            String sourcePluginId = (String)runtimeArgs.get("sourceId");
            String sinkPluginId = (String)runtimeArgs.get("sinkId");
            List transformIds = (List)GSON.fromJson((String)runtimeArgs.get("transformIds"), STRING_LIST_TYPE);
            List stageList = etlConfig.getTransforms();
            ArrayList pipeline = Lists.newArrayListWithCapacity((int)(stageList.size() + 2));
            ArrayList stageMetrics = Lists.newArrayListWithCapacity((int)(stageList.size() + 2));
            this.transforms = Lists.newArrayListWithCapacity((int)stageList.size());
            BatchSource source = (BatchSource)context.newPluginInstance(sourcePluginId);
            MapReduceSourceContext batchSourceContext = new MapReduceSourceContext(context, this.mapperMetrics, sourcePluginId);
            source.initialize(batchSourceContext);
            pipeline.add(source);
            stageMetrics.add(new StageMetrics(this.mapperMetrics, PluginID.from((String)sourcePluginId)));
            this.addTransforms(stageList, pipeline, stageMetrics, transformIds, context);
            BatchSink sink = (BatchSink)context.newPluginInstance(sinkPluginId);
            MapReduceSinkContext batchSinkContext = new MapReduceSinkContext(context, this.mapperMetrics, sinkPluginId);
            sink.initialize(batchSinkContext);
            pipeline.add(sink);
            stageMetrics.add(new StageMetrics(this.mapperMetrics, PluginID.from((String)sinkPluginId)));
            this.transformExecutor = new TransformExecutor((List)pipeline, (List)stageMetrics);
        }

        private void addTransforms(List<ETLStage> stageConfigs, List<Transformation> pipeline, List<StageMetrics> stageMetrics, List<String> transformIds, MapReduceContext context) throws Exception {
            Preconditions.checkArgument((stageConfigs.size() == transformIds.size() ? 1 : 0) != 0);
            for (int i = 0; i < stageConfigs.size(); ++i) {
                ETLStage stageConfig = stageConfigs.get(i);
                String transformId = transformIds.get(i);
                Transform transform = (Transform)context.newPluginInstance(transformId);
                BatchTransformContext transformContext = new BatchTransformContext(context, this.mapperMetrics, transformId);
                LOG.debug("Transform Stage : {}", (Object)stageConfig.getName());
                LOG.debug("Transform Class : {}", (Object)transform.getClass().getName());
                transform.initialize(transformContext);
                pipeline.add(transform);
                this.transforms.add(transform);
                stageMetrics.add(new StageMetrics(this.mapperMetrics, PluginID.from((String)transformId)));
            }
        }

        public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException {
            try {
                KeyValue input = new KeyValue(key, value);
                for (KeyValue output : this.transformExecutor.runOneIteration((Object)input)) {
                    context.write(output.getKey(), output.getValue());
                }
            }
            catch (Exception e) {
                LOG.error("Exception thrown in BatchDriver Mapper : {}", (Throwable)e);
                Throwables.propagate((Throwable)e);
            }
        }

        public void destroy() {
            Destroyables.destroyQuietly(this.transformExecutor);
        }
    }
}

