/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.dq;

import co.cask.cdap.api.Config;
import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduce;
import co.cask.cdap.api.mapreduce.MapReduceConfigurer;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.plugin.PluginProperties;
import co.cask.cdap.api.schedule.Schedules;
import co.cask.cdap.api.service.Service;
import co.cask.cdap.api.stream.GenericStreamEventData;
import co.cask.cdap.api.workflow.Workflow;
import co.cask.cdap.dq.AggregationTypeValue;
import co.cask.cdap.dq.DataQualityService;
import co.cask.cdap.dq.DataQualitySource;
import co.cask.cdap.dq.DataQualityWorkflow;
import co.cask.cdap.dq.DataQualityWritable;
import co.cask.cdap.dq.MapReducePipelineConfigurer;
import co.cask.cdap.dq.functions.BasicAggregationFunction;
import co.cask.cdap.dq.functions.CombinableAggregationFunction;
import co.cask.cdap.dq.rowkey.AggregationsRowKey;
import co.cask.cdap.dq.rowkey.ValuesRowKey;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.batch.MapReduceSourceContext;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataQualityApp
extends AbstractApplication<DataQualityConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(DataQualityApp.class);
    private static final Gson GSON = new Gson();
    private static final Type TOKEN_TYPE_MAP_STRING_SET_STRING = new TypeToken<Map<String, Set<String>>>(){}.getType();

    public void configure() {
        DataQualityConfig configObj = (DataQualityConfig)this.getContext().getConfig();
        Preconditions.checkArgument((configObj.workflowScheduleMinutes > 0 ? 1 : 0) != 0, (Object)"Workflow Frequency in minutes (>0) should be provided");
        Preconditions.checkArgument((configObj.source != null ? 1 : 0) != 0, (Object)"Configuration for DataQualityApp Source is missing");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)configObj.source.getName()) ? 1 : 0) != 0, (Object)"Data Quality source name should not be null or empty");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)configObj.source.getId()) ? 1 : 0) != 0, (Object)"Data Quality source id should not be null or empty");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)configObj.datasetName) ? 1 : 0) != 0, (Object)"Output Dataset name should be not be null or empty");
        Preconditions.checkArgument((configObj.fieldAggregations != null ? 1 : 0) != 0, (Object)"fieldAggregations needs to be specified");
        Preconditions.checkArgument((!configObj.fieldAggregations.isEmpty() ? 1 : 0) != 0, (Object)"fieldAggregations should not be empty");
        boolean validEntry = false;
        for (Map.Entry entry : configObj.fieldAggregations.entrySet()) {
            if (Strings.isNullOrEmpty((String)((String)entry.getKey())) || ((Set)entry.getValue()).isEmpty()) continue;
            validEntry = true;
            break;
        }
        Preconditions.checkArgument((boolean)validEntry, (Object)"At least one field with one or more aggregations must be provided");
        Integer scheduleMinutes = configObj.workflowScheduleMinutes;
        this.addMapReduce((MapReduce)new FieldAggregator(configObj.source, configObj.datasetName, configObj.fieldAggregations));
        this.setName("DataQualityApp");
        this.setDescription("Application with MapReduce job to determine the data quality in a Batch Source");
        this.createDataset(configObj.datasetName, Table.class);
        this.addService((Service)new DataQualityService(configObj.datasetName));
        this.addWorkflow((Workflow)new DataQualityWorkflow());
        String schedule = "*/" + scheduleMinutes + " * * * *";
        this.scheduleWorkflow(Schedules.createTimeSchedule((String)"aggregatorSchedule", (String)("Schedule execution every " + scheduleMinutes + " min"), (String)schedule), "DataQualityWorkflow");
    }

    public static class AggregationReducer
    extends Reducer<Text, DataQualityWritable, byte[], Put>
    implements ProgramLifecycle<MapReduceContext> {
        private static final Gson GSON = new Gson();
        private String sourceId;
        private Map<String, Set<String>> fieldAggregations;
        long timeKey = 0L;

        public void initialize(MapReduceContext mapReduceContext) throws Exception {
            this.timeKey = mapReduceContext.getLogicalStartTime();
            this.sourceId = (String)mapReduceContext.getSpecification().getProperties().get("sourceId");
            this.fieldAggregations = (Map)GSON.fromJson((String)mapReduceContext.getSpecification().getProperties().get("fieldAggregations"), TOKEN_TYPE_MAP_STRING_SET_STRING);
        }

        public void reduce(Text key, Iterable<DataQualityWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            LOG.trace("timestamp: {}", (Object)this.timeKey);
            Set<String> aggregationTypesSet = this.fieldAggregations.get(key.toString());
            ArrayList<AggregationTypeValue> aggregationTypeValueList = new ArrayList<AggregationTypeValue>();
            AggregationsRowKey aggregationsRowKey = new AggregationsRowKey(this.timeKey, this.sourceId);
            byte[] fieldColumnKey = Bytes.toBytes((String)key.toString());
            for (String aggregationType : aggregationTypesSet) {
                boolean isCombinable = true;
                try {
                    Class<?> aggregationClass = Class.forName("co.cask.cdap.dq.functions." + aggregationType);
                    BasicAggregationFunction instance = (BasicAggregationFunction)aggregationClass.newInstance();
                    isCombinable = instance instanceof CombinableAggregationFunction;
                    for (DataQualityWritable value : values) {
                        instance.add(value);
                    }
                    ValuesRowKey valuesRowKey = new ValuesRowKey(this.timeKey, key.toString(), this.sourceId);
                    context.write((Object)valuesRowKey.getTableRowKey(), (Object)new Put(valuesRowKey.getTableRowKey(), Bytes.toBytes((String)aggregationType), instance.aggregate()));
                    AggregationTypeValue aggregationTypeValue = new AggregationTypeValue(aggregationType, isCombinable);
                    aggregationTypeValueList.add(aggregationTypeValue);
                }
                catch (ClassNotFoundException | IllegalAccessException | InstantiationException | RuntimeException e) {
                    throw new RuntimeException(e);
                }
            }
            byte[] aggregationTypeListBytes = Bytes.toBytes((String)GSON.toJson(aggregationTypeValueList));
            context.write((Object)aggregationsRowKey.getTableRowKey(), (Object)new Put(aggregationsRowKey.getTableRowKey(), fieldColumnKey, aggregationTypeListBytes));
        }

        public void destroy() {
        }
    }

    public static class AggregationMapper
    extends Mapper<LongWritable, GenericStreamEventData<StructuredRecord>, Text, DataQualityWritable>
    implements ProgramLifecycle<MapReduceContext> {
        private Set<String> fieldsSet = new HashSet<String>();

        public void initialize(MapReduceContext mapReduceContext) throws Exception {
            Map fieldAggregations = (Map)GSON.fromJson((String)mapReduceContext.getSpecification().getProperties().get("fieldAggregations"), TOKEN_TYPE_MAP_STRING_SET_STRING);
            if (fieldAggregations != null) {
                this.fieldsSet = fieldAggregations.keySet();
            }
        }

        public void map(LongWritable key, GenericStreamEventData<StructuredRecord> event, Mapper.Context context) throws IOException, InterruptedException {
            StructuredRecord body = (StructuredRecord)event.getBody();
            for (Schema.Field field : body.getSchema().getFields()) {
                DataQualityWritable outputValue;
                String fieldName = field.getName();
                Object fieldVal = body.get(fieldName);
                if (fieldVal == null || (outputValue = field.getSchema().isNullable() ? this.getOutputValue(field.getSchema().getNonNullable().getType(), fieldVal) : this.getOutputValue(field.getSchema().getType(), fieldVal)) == null || !this.fieldsSet.contains(fieldName) && !this.fieldsSet.isEmpty()) continue;
                context.write((Object)new Text(fieldName), (Object)outputValue);
            }
        }

        @Nullable
        private DataQualityWritable getOutputValue(Schema.Type type, Object o) {
            DataQualityWritable writable = new DataQualityWritable();
            switch (type) {
                case STRING: {
                    writable.set((Writable)new Text((String)o));
                    break;
                }
                case INT: {
                    writable.set((Writable)new IntWritable(((Integer)o).intValue()));
                    break;
                }
                case LONG: {
                    writable.set((Writable)new LongWritable(((Long)o).longValue()));
                    break;
                }
                case DOUBLE: {
                    writable.set((Writable)new DoubleWritable(((Double)o).doubleValue()));
                    break;
                }
                case FLOAT: {
                    writable.set((Writable)new FloatWritable(((Float)o).floatValue()));
                    break;
                }
                case BOOLEAN: {
                    writable.set((Writable)new BooleanWritable(((Boolean)o).booleanValue()));
                    break;
                }
                case BYTES: {
                    writable.set((Writable)new ByteWritable(((Byte)o).byteValue()));
                    break;
                }
                default: {
                    return null;
                }
            }
            return writable;
        }

        public void destroy() {
        }
    }

    public static final class FieldAggregator
    extends AbstractMapReduce {
        private static final String PLUGIN_ID = "input:";
        private final String datasetName;
        private final Map<String, Set<String>> fieldAggregations;
        private DataQualitySource source;

        public FieldAggregator(DataQualitySource source, String datasetName, Map<String, Set<String>> fieldAggregations) {
            this.source = source;
            this.datasetName = datasetName;
            this.fieldAggregations = fieldAggregations;
        }

        public void configure() {
            super.configure();
            MapReduceConfigurer mrConfigurer = this.getConfigurer();
            BatchSource batchSource = (BatchSource)this.usePlugin("batchsource", this.source.getName(), PLUGIN_ID, PluginProperties.builder().addAll(this.source.getProperties()).build());
            Preconditions.checkNotNull((Object)batchSource, (String)"Could not find plugin %s of type 'source'", (Object[])new Object[]{this.source.getName()});
            batchSource.configurePipeline(new MapReducePipelineConfigurer(mrConfigurer, PLUGIN_ID));
            this.setName("FieldAggregator");
            this.setProperties((Map)ImmutableMap.builder().put((Object)"fieldAggregations", (Object)GSON.toJson(this.fieldAggregations)).put((Object)"sourceId", (Object)this.source.getId()).put((Object)"datasetName", (Object)this.datasetName).build());
        }

        public void beforeSubmit(MapReduceContext context) throws Exception {
            Job job = (Job)context.getHadoopJob();
            job.setMapperClass(AggregationMapper.class);
            job.setReducerClass(AggregationReducer.class);
            BatchSource batchSource = (BatchSource)context.newPluginInstance(PLUGIN_ID);
            MapReduceSourceContext sourceContext = new MapReduceSourceContext(context, null, PLUGIN_ID);
            batchSource.prepareRun(sourceContext);
            context.addOutput(context.getSpecification().getProperty("datasetName"));
        }
    }

    public static class DataQualityConfig
    extends Config {
        private int workflowScheduleMinutes;
        private DataQualitySource source;
        private String datasetName;
        private Map<String, Set<String>> fieldAggregations;

        public DataQualityConfig(int workflowScheduleMinutes, DataQualitySource source, String datasetName, Map<String, Set<String>> fieldAggregations) {
            this.workflowScheduleMinutes = workflowScheduleMinutes;
            this.source = source;
            this.datasetName = datasetName;
            this.fieldAggregations = fieldAggregations;
        }
    }
}

