/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.etl.realtime;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.plugin.PluginConfigurer;
import co.cask.cdap.api.worker.AbstractWorker;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.etl.api.Destroyable;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.Transformation;
import co.cask.cdap.etl.api.realtime.RealtimeSink;
import co.cask.cdap.etl.api.realtime.RealtimeSource;
import co.cask.cdap.etl.api.realtime.SourceState;
import co.cask.cdap.etl.common.DefaultEmitter;
import co.cask.cdap.etl.common.Destroyables;
import co.cask.cdap.etl.common.ETLConfig;
import co.cask.cdap.etl.common.Pipeline;
import co.cask.cdap.etl.common.PipelineRegisterer;
import co.cask.cdap.etl.common.PluginID;
import co.cask.cdap.etl.common.SinkInfo;
import co.cask.cdap.etl.common.StageMetrics;
import co.cask.cdap.etl.common.StructuredRecordStringConverter;
import co.cask.cdap.etl.common.TransformDetail;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TransformInfo;
import co.cask.cdap.etl.common.TransformResponse;
import co.cask.cdap.etl.realtime.DefaultDataWriter;
import co.cask.cdap.etl.realtime.RealtimeTransformContext;
import co.cask.cdap.etl.realtime.TrackedRealtimeSink;
import co.cask.cdap.etl.realtime.WorkerRealtimeContext;
import co.cask.cdap.etl.realtime.config.ETLRealtimeConfig;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ETLWorker
extends AbstractWorker {
    public static final String NAME = ETLWorker.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ETLWorker.class);
    private static final Type TRANSFORMDETAILS_LIST_TYPE = new TypeToken<List<TransformInfo>>(){}.getType();
    private static final Type SINK_INFO_TYPE = new TypeToken<List<SinkInfo>>(){}.getType();
    private static final Gson GSON = new Gson();
    private static final String SEPARATOR = ":";
    private static final Schema ERROR_SCHEMA = Schema.recordOf((String)"error", (Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"errCode", (Schema)Schema.of((Schema.Type)Schema.Type.INT)), Schema.Field.of((String)"errMsg", (Schema)Schema.unionOf((Schema[])new Schema[]{Schema.of((Schema.Type)Schema.Type.STRING), Schema.of((Schema.Type)Schema.Type.NULL)})), Schema.Field.of((String)"invalidRecord", (Schema)Schema.of((Schema.Type)Schema.Type.STRING))});
    private final ETLRealtimeConfig config;
    private RealtimeSource source;
    private List<RealtimeSink> sinks;
    private List<Metrics> transformMetrics;
    private TransformExecutor transformExecutor;
    private DefaultEmitter sourceEmitter;
    private String stateStoreKey;
    private byte[] stateStoreKeyBytes;
    private String appName;
    private Metrics metrics;
    private List<TransformDetail> transformationDetailList;
    private Map<String, String> tranformIdToDatasetName;
    private volatile boolean stopped;

    public ETLWorker(ETLRealtimeConfig config) {
        this.config = config;
    }

    public void configure() {
        int instances;
        this.setName(NAME);
        this.setDescription("Worker Driver for Realtime ETL Adapters");
        int n = instances = this.config.getInstances() != null ? this.config.getInstances() : 1;
        if (instances < 1) {
            throw new IllegalArgumentException("instances must be greater than 0.");
        }
        this.setInstances(instances);
        if (this.config.getResources() != null) {
            this.setResources(this.config.getResources());
        }
        PipelineRegisterer registerer = new PipelineRegisterer((PluginConfigurer)this.getConfigurer(), "realtime");
        Pipeline pluginIDs = registerer.registerPlugins((ETLConfig)this.config, Table.class, DatasetProperties.builder().add("schema", ERROR_SCHEMA.toString()).build(), false);
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("sourceId", pluginIDs.getSource());
        properties.put("sinkIds", GSON.toJson((Object)pluginIDs.getSinks()));
        properties.put("transformIds", GSON.toJson((Object)pluginIDs.getTransforms()));
        properties.put("uniqueid", String.valueOf(System.currentTimeMillis()));
        this.setProperties(properties);
    }

    public void initialize(WorkerContext context) throws Exception {
        super.initialize(context);
        Map properties = context.getSpecification().getProperties();
        this.appName = context.getApplicationSpecification().getName();
        Preconditions.checkArgument((boolean)properties.containsKey("sourceId"));
        Preconditions.checkArgument((boolean)properties.containsKey("sinkIds"));
        Preconditions.checkArgument((boolean)properties.containsKey("transformIds"));
        Preconditions.checkArgument((boolean)properties.containsKey("uniqueid"));
        String uniqueId = (String)properties.get("uniqueid");
        final String appName = context.getApplicationSpecification().getName();
        this.stateStoreKey = String.format("%s%s%s%s%s", appName, SEPARATOR, uniqueId, SEPARATOR, context.getInstanceId());
        this.stateStoreKeyBytes = Bytes.toBytes((String)this.stateStoreKey);
        this.getContext().execute(new TxRunnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run(DatasetContext dsContext) throws Exception {
                KeyValueTable stateTable = (KeyValueTable)dsContext.getDataset("etlrealtimesourcestate");
                byte[] startKey = Bytes.toBytes((String)String.format("%s%s", appName, ETLWorker.SEPARATOR));
                try (CloseableIterator rows = stateTable.scan(startKey, Bytes.stopKeyForPrefix((byte[])startKey));){
                    while (rows.hasNext()) {
                        KeyValue row = (KeyValue)rows.next();
                        if (Bytes.compareTo((byte[])ETLWorker.this.stateStoreKeyBytes, (byte[])((byte[])row.getKey())) == 0) continue;
                        stateTable.delete((byte[])row.getKey());
                    }
                }
            }
        });
        this.initializeSource(context);
        this.transformationDetailList = this.initializeTransforms(context);
        this.initializeSinks(context);
        this.transformExecutor = new TransformExecutor(this.transformationDetailList);
    }

    private void initializeSource(WorkerContext context) throws Exception {
        String sourcePluginId = context.getSpecification().getProperty("sourceId");
        this.source = (RealtimeSource)context.newPluginInstance(sourcePluginId);
        WorkerRealtimeContext sourceContext = new WorkerRealtimeContext(context, this.metrics, sourcePluginId);
        LOG.debug("Source Class : {}", (Object)this.source.getClass().getName());
        this.source.initialize(sourceContext);
        this.sourceEmitter = new DefaultEmitter((Metrics)new StageMetrics(this.metrics, PluginID.from((String)sourcePluginId)));
    }

    private void initializeSinks(WorkerContext context) throws Exception {
        List sinkInfos = (List)GSON.fromJson(context.getSpecification().getProperty("sinkIds"), SINK_INFO_TYPE);
        this.sinks = Lists.newArrayListWithCapacity((int)sinkInfos.size());
        for (SinkInfo sinkInfo : sinkInfos) {
            TrackedRealtimeSink sink = (TrackedRealtimeSink)context.newPluginInstance(sinkInfo.getSinkId());
            WorkerRealtimeContext sinkContext = new WorkerRealtimeContext(context, this.metrics, sinkInfo.getSinkId());
            LOG.debug("Sink Class : {}", (Object)sink.getClass().getName());
            sink.initialize(sinkContext);
            sink = new TrackedRealtimeSink(sink, this.metrics, PluginID.from((String)sinkInfo.getSinkId()));
            this.sinks.add(sink);
        }
    }

    private List<TransformDetail> initializeTransforms(WorkerContext context) throws Exception {
        List transformInfos = (List)GSON.fromJson(context.getSpecification().getProperty("transformIds"), TRANSFORMDETAILS_LIST_TYPE);
        Preconditions.checkArgument((transformInfos != null ? 1 : 0) != 0);
        ArrayList<TransformDetail> transformDetailList = new ArrayList<TransformDetail>(transformInfos.size());
        this.tranformIdToDatasetName = new HashMap<String, String>(transformInfos.size());
        for (int i = 0; i < transformInfos.size(); ++i) {
            String transformId = ((TransformInfo)transformInfos.get(i)).getTransformId();
            try {
                Transform transform = (Transform)context.newPluginInstance(transformId);
                RealtimeTransformContext transformContext = new RealtimeTransformContext(context, this.metrics, transformId);
                LOG.debug("Transform Class : {}", (Object)transform.getClass().getName());
                transform.initialize(transformContext);
                StageMetrics stageMetrics = new StageMetrics(this.metrics, PluginID.from((String)transformId));
                transformDetailList.add(new TransformDetail(transformId, (Transformation)transform, stageMetrics));
                if (((TransformInfo)transformInfos.get(i)).getErrorDatasetName() == null) continue;
                this.tranformIdToDatasetName.put(transformId, ((TransformInfo)transformInfos.get(i)).getErrorDatasetName());
                continue;
            }
            catch (InstantiationException e) {
                LOG.error("Unable to instantiate Transform", (Throwable)e);
                Throwables.propagate((Throwable)e);
            }
        }
        return transformDetailList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        final SourceState currentState = new SourceState();
        final SourceState nextState = new SourceState();
        final ArrayList dataToSink = Lists.newArrayList();
        final Map<String, List<InvalidEntry>> transformIdToErrorRecords = this.intializeTransformIdToErrorsList();
        HashSet transformErrorsWithoutDataset = Sets.newHashSet();
        this.getContext().execute(new TxRunnable(){

            public void run(DatasetContext context) throws Exception {
                KeyValueTable stateTable = (KeyValueTable)context.getDataset("etlrealtimesourcestate");
                byte[] stateBytes = stateTable.read(ETLWorker.this.stateStoreKeyBytes);
                if (stateBytes != null) {
                    SourceState state = (SourceState)GSON.fromJson(Bytes.toString((byte[])stateBytes), SourceState.class);
                    currentState.setState(state);
                }
            }
        });
        while (!this.stopped) {
            block15: {
                try {
                    SourceState newState = this.source.poll(this.sourceEmitter, new SourceState(currentState));
                    if (newState == null) break block15;
                    nextState.setState(newState);
                }
                catch (Exception e) {
                    LOG.warn("Exception thrown during polling of Source for data", (Throwable)e);
                    this.sourceEmitter.reset();
                    continue;
                }
            }
            for (Object sourceData : this.sourceEmitter) {
                try {
                    TransformResponse transformResponse = this.transformExecutor.runOneIteration(sourceData);
                    while (transformResponse.getEmittedRecords().hasNext()) {
                        dataToSink.add(transformResponse.getEmittedRecords().next());
                    }
                    Map entryMap = transformResponse.getMapTransformIdToErrorEmitter();
                    for (Map.Entry entry : entryMap.entrySet()) {
                        String transformId = (String)entry.getKey();
                        if (!this.tranformIdToDatasetName.containsKey(transformId)) {
                            if (transformErrorsWithoutDataset.contains(transformId)) continue;
                            LOG.warn("Error records were emitted in transform {}, but error dataset is not configured for this transform", (Object)transformId);
                            continue;
                        }
                        transformIdToErrorRecords.get(transformId).addAll((Collection)entry.getValue());
                    }
                }
                catch (Exception e) {
                    LOG.warn("Exception thrown while processing data {}", sourceData, (Object)e);
                }
            }
            this.sourceEmitter.reset();
            try {
                if (dataToSink.isEmpty() && nextState.equals(currentState)) continue;
                this.getContext().execute(new TxRunnable(){

                    public void run(DatasetContext context) throws Exception {
                        if (!dataToSink.isEmpty()) {
                            DefaultDataWriter defaultDataWriter = new DefaultDataWriter(ETLWorker.this.getContext(), context);
                            for (RealtimeSink sink : ETLWorker.this.sinks) {
                                sink.write(dataToSink, defaultDataWriter);
                            }
                        }
                        for (Map.Entry errorRecordEntry : transformIdToErrorRecords.entrySet()) {
                            String transformId = (String)errorRecordEntry.getKey();
                            String datasetName = (String)ETLWorker.this.tranformIdToDatasetName.get(transformId);
                            Table errorTable = (Table)context.getDataset(datasetName);
                            long timeInMillis = System.currentTimeMillis();
                            byte[] currentTime = Bytes.toBytes((long)timeInMillis);
                            String transformIdentifier = ETLWorker.this.appName + ETLWorker.SEPARATOR + transformId;
                            for (InvalidEntry invalidEntry : (List)errorRecordEntry.getValue()) {
                                byte[] rowKey = Bytes.concat((byte[][])new byte[][]{currentTime, Bytes.toBytes((String)transformIdentifier), Bytes.toBytes((UUID)UUID.randomUUID())});
                                Put errorPut = ETLWorker.this.constructErrorPut(rowKey, invalidEntry, timeInMillis);
                                errorTable.write((Object)rowKey, (Object)errorPut);
                            }
                        }
                        if (!nextState.equals(currentState)) {
                            KeyValueTable stateTable = (KeyValueTable)context.getDataset("etlrealtimesourcestate");
                            stateTable.write(ETLWorker.this.stateStoreKey, GSON.toJson((Object)nextState));
                        }
                        ETLWorker.this.transformExecutor.resetEmitters();
                    }
                });
                currentState.setState(nextState);
            }
            catch (Exception e) {
                LOG.warn("Exception thrown during persisting of data", (Throwable)e);
            }
            finally {
                dataToSink.clear();
                for (List<InvalidEntry> invalidEntryList : transformIdToErrorRecords.values()) {
                    invalidEntryList.clear();
                }
            }
        }
    }

    private Map<String, List<InvalidEntry>> intializeTransformIdToErrorsList() {
        HashMap<String, List<InvalidEntry>> transformIdToErrorListMap = new HashMap<String, List<InvalidEntry>>();
        for (String transformId : this.tranformIdToDatasetName.keySet()) {
            transformIdToErrorListMap.put(transformId, new ArrayList());
        }
        return transformIdToErrorListMap;
    }

    private Put constructErrorPut(byte[] rowKey, InvalidEntry entry, long timeInMillis) throws IOException {
        Put errorPut = new Put(rowKey);
        errorPut.add("errCode", entry.getErrorCode());
        errorPut.add("errTimestamp", timeInMillis);
        if (entry.getInvalidRecord() instanceof StructuredRecord) {
            StructuredRecord record = (StructuredRecord)entry.getInvalidRecord();
            errorPut.add("invalidRecord", StructuredRecordStringConverter.toJsonString((StructuredRecord)record));
        } else {
            errorPut.add("invalidRecord", String.format("Error Entry is of type %s, only records of type co.cask.cdap.api.data.format.StructuredRecord is supported currently", entry.getInvalidRecord().getClass().getName()));
        }
        return errorPut;
    }

    public void stop() {
        this.stopped = true;
    }

    public void destroy() {
        Destroyables.destroyQuietly((Destroyable)this.source);
        Destroyables.destroyQuietly((Destroyable)this.transformExecutor);
        for (RealtimeSink sink : this.sinks) {
            Destroyables.destroyQuietly((Destroyable)sink);
        }
    }
}

