/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.dq;

import co.cask.cdap.api.annotation.Property;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.service.AbstractService;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceContext;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import co.cask.cdap.dq.AggregationTypeValue;
import co.cask.cdap.dq.FieldDetail;
import co.cask.cdap.dq.TimestampValue;
import co.cask.cdap.dq.functions.BasicAggregationFunction;
import co.cask.cdap.dq.functions.CombinableAggregationFunction;
import co.cask.cdap.dq.rowkey.AggregationsRowKey;
import co.cask.cdap.dq.rowkey.ValuesRowKey;
import com.google.common.base.Charsets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;

public class DataQualityService
extends AbstractService {
    public static final String SERVICE_NAME = "DataQualityService";
    private static final Gson GSON = new Gson();
    private static final Type TOKEN_TYPE_SET_AGGREGATION_TYPE_VALUES = new TypeToken<HashSet<AggregationTypeValue>>(){}.getType();
    private final String datasetName;

    public DataQualityService(String datasetName) {
        this.datasetName = datasetName;
    }

    protected void configure() {
        this.setName(SERVICE_NAME);
        this.setDescription("Service to query data quality histogram.");
        this.addHandler((HttpServiceHandler)new ValuesLookup(this.datasetName));
    }

    @Path(value="/v1")
    public static final class ValuesLookup
    extends AbstractHttpServiceHandler {
        @Property
        private final String datasetName;
        Table dataStore;

        public ValuesLookup(String datasetName) {
            this.datasetName = datasetName;
        }

        protected void configure() {
            this.useDatasets(this.datasetName, new String[0]);
        }

        public void initialize(HttpServiceContext context) throws Exception {
            super.initialize(context);
            this.dataStore = (Table)context.getDataset(this.datasetName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Path(value="sources/{sourceID}/fields")
        @GET
        public void fieldsGetter(HttpServiceRequest request, HttpServiceResponder responder, @PathParam(value="sourceID") String sourceID, @QueryParam(value="startTimestamp") @DefaultValue(value="0") long startTimestamp, @QueryParam(value="endTimestamp") @DefaultValue(value="9223372036854775807") long endTimestamp) throws IOException {
            AggregationsRowKey aggregationsRowKeyStart = new AggregationsRowKey(startTimestamp, sourceID);
            AggregationsRowKey aggregationsRowKeyEnd = new AggregationsRowKey(endTimestamp + 1L, sourceID);
            HashMap<String, FieldDetail> fieldDetailMap = new HashMap<String, FieldDetail>();
            try (Scanner scanner = this.dataStore.scan(aggregationsRowKeyStart.getTableRowKey(), aggregationsRowKeyEnd.getTableRowKey());){
                Row row;
                while ((row = scanner.next()) != null) {
                    Map columnsMapBytes = row.getColumns();
                    ArrayList<FieldDetail> timestampSpecificFieldDetailList = new ArrayList<FieldDetail>();
                    for (Map.Entry columnMapEntry : columnsMapBytes.entrySet()) {
                        String fieldName = Bytes.toString((byte[])((byte[])columnMapEntry.getKey()));
                        byte[] output = (byte[])columnMapEntry.getValue();
                        String outputString = Bytes.toString((byte[])output);
                        Set aggregationTypeValuesSet = (Set)GSON.fromJson(outputString, TOKEN_TYPE_SET_AGGREGATION_TYPE_VALUES);
                        FieldDetail fieldDetail = new FieldDetail(fieldName, aggregationTypeValuesSet);
                        timestampSpecificFieldDetailList.add(fieldDetail);
                    }
                    for (FieldDetail fdTimestampSpecific : timestampSpecificFieldDetailList) {
                        String fdTimestampSpecificFieldName = fdTimestampSpecific.getFieldName();
                        if (fieldDetailMap.containsKey(fdTimestampSpecificFieldName)) {
                            FieldDetail fdCombined = (FieldDetail)fieldDetailMap.get(fdTimestampSpecificFieldName);
                            fdCombined.addAggregations(fdTimestampSpecific.getAggregationTypeSet());
                            continue;
                        }
                        fieldDetailMap.put(fdTimestampSpecificFieldName, fdTimestampSpecific);
                    }
                }
            }
            if (fieldDetailMap.isEmpty()) {
                responder.sendString(404, String.format("No fields for source '%s' found within time range.", sourceID), Charsets.UTF_8);
            } else {
                responder.sendJson(200, fieldDetailMap.values());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Path(value="sources/{sourceID}/fields/{fieldName}")
        @GET
        public void aggregationTypesGetter(HttpServiceRequest request, HttpServiceResponder responder, @PathParam(value="fieldName") String fieldName, @PathParam(value="sourceID") String sourceID, @QueryParam(value="startTimestamp") @DefaultValue(value="0") long startTimestamp, @QueryParam(value="endTimestamp") @DefaultValue(value="9223372036854775807") long endTimestamp) throws IOException {
            AggregationsRowKey aggregationsRowKeyStart = new AggregationsRowKey(startTimestamp, sourceID);
            AggregationsRowKey aggregationsRowKeyEnd = new AggregationsRowKey(endTimestamp + 1L, sourceID);
            Scanner scanner = this.dataStore.scan(aggregationsRowKeyStart.getTableRowKey(), aggregationsRowKeyEnd.getTableRowKey());
            byte[] fieldNameBytes = Bytes.toBytes((String)fieldName);
            HashSet commonAggregationTypeValues = new HashSet();
            try {
                Row row;
                while ((row = scanner.next()) != null) {
                    Map columnsMapBytes = row.getColumns();
                    byte[] output = (byte[])columnsMapBytes.get(fieldNameBytes);
                    String outputString = Bytes.toString((byte[])output);
                    Set aggregationTypeValuesSet = (Set)GSON.fromJson(outputString, TOKEN_TYPE_SET_AGGREGATION_TYPE_VALUES);
                    commonAggregationTypeValues.addAll(aggregationTypeValuesSet);
                }
            }
            finally {
                scanner.close();
            }
            if (commonAggregationTypeValues.isEmpty()) {
                responder.sendString(404, String.format("No aggregations for source '%s' and field '%s' found within time range.", sourceID, fieldName), Charsets.UTF_8);
            } else {
                responder.sendJson(200, commonAggregationTypeValues);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Path(value="sources/{sourceID}/fields/{fieldName}/aggregations/{aggregationType}/totals")
        @GET
        public void combinableAggregationGetter(HttpServiceRequest request, HttpServiceResponder responder, @PathParam(value="fieldName") String fieldName, @PathParam(value="aggregationType") String aggregationType, @PathParam(value="sourceID") String sourceID, @QueryParam(value="startTimestamp") @DefaultValue(value="0") long startTimestamp, @QueryParam(value="endTimestamp") @DefaultValue(value="9223372036854775807") long endTimestamp) throws IOException {
            ValuesRowKey valuesRowKeyStart = new ValuesRowKey(startTimestamp, fieldName, sourceID);
            ValuesRowKey valuesRowKeyEnd = new ValuesRowKey(endTimestamp + 1L, fieldName, sourceID);
            try {
                Class<?> aggregationClass = Class.forName("co.cask.cdap.dq.functions." + aggregationType);
                CombinableAggregationFunction aggregationClassInstance = (CombinableAggregationFunction)aggregationClass.newInstance();
                byte[] aggregationTypeBytes = Bytes.toBytes((String)aggregationType);
                try (Scanner scanner = this.dataStore.scan(valuesRowKeyStart.getTableRowKey(), valuesRowKeyEnd.getTableRowKey());){
                    Row row;
                    while ((row = scanner.next()) != null) {
                        Map columnsMapBytes = row.getColumns();
                        byte[] output = (byte[])columnsMapBytes.get(aggregationTypeBytes);
                        if (output == null) continue;
                        aggregationClassInstance.combine(output);
                    }
                }
                Object output = aggregationClassInstance.retrieveAggregation();
                if (output == null) {
                    responder.sendString(404, "No aggregation for the given parameters", Charsets.UTF_8);
                } else {
                    responder.sendJson(200, output);
                }
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                responder.sendString(404, String.format("No aggregations for source '%s' and field '%s' found within time range.", sourceID, fieldName), Charsets.UTF_8);
            }
            catch (ClassCastException e) {
                responder.sendString(400, "Aggregation function is not a Combinable Aggregation Function", Charsets.UTF_8);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Path(value="sources/{sourceID}/fields/{fieldName}/aggregations/{aggregationType}/timeseries")
        @GET
        public void basicAggregationGetter(HttpServiceRequest request, HttpServiceResponder responder, @PathParam(value="fieldName") String fieldName, @PathParam(value="aggregationType") String aggregationType, @PathParam(value="sourceID") String sourceID, @QueryParam(value="startTimestamp") @DefaultValue(value="0") long startTimestamp, @QueryParam(value="endTimestamp") @DefaultValue(value="9223372036854775807") long endTimestamp) throws IOException {
            ValuesRowKey valuesRowKeyStart = new ValuesRowKey(startTimestamp, fieldName, sourceID);
            ValuesRowKey valuesRowKeyEnd = new ValuesRowKey(endTimestamp + 1L, fieldName, sourceID);
            ArrayList timestampValueList = new ArrayList();
            try {
                Class<?> aggregationClass = Class.forName("co.cask.cdap.dq.functions." + aggregationType);
                BasicAggregationFunction aggregationClassInstance = (BasicAggregationFunction)aggregationClass.newInstance();
                byte[] aggregationTypeBytes = Bytes.toBytes((String)aggregationType);
                try (Scanner scanner = this.dataStore.scan(valuesRowKeyStart.getTableRowKey(), valuesRowKeyEnd.getTableRowKey());){
                    Row row;
                    while ((row = scanner.next()) != null) {
                        byte[] rowBytes = row.getRow();
                        Long timestamp = Bytes.toLong((byte[])rowBytes, (int)(rowBytes.length - 8));
                        Map columnsMapBytes = row.getColumns();
                        byte[] output = (byte[])columnsMapBytes.get(aggregationTypeBytes);
                        if (output == null) continue;
                        Object deserializedOutput = aggregationClassInstance.deserialize(output);
                        TimestampValue tsValue = new TimestampValue(timestamp, deserializedOutput);
                        timestampValueList.add(tsValue);
                    }
                }
                if (timestampValueList.isEmpty()) {
                    responder.sendString(404, "No aggregation for the given parameters", Charsets.UTF_8);
                } else {
                    responder.sendJson(200, timestampValueList);
                }
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                responder.sendString(404, String.format("Aggregations '%s' for source '%s' and field '%s' could not be found within time range.", aggregationType, sourceID, fieldName), Charsets.UTF_8);
            }
            catch (ClassCastException e) {
                responder.sendString(400, "Aggregation function is not a Basic Aggregation Function", Charsets.UTF_8);
            }
        }
    }
}

