/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.template.etl.realtime;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.worker.AbstractWorker;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.template.etl.api.Destroyable;
import co.cask.cdap.template.etl.api.Transform;
import co.cask.cdap.template.etl.api.Transformation;
import co.cask.cdap.template.etl.api.realtime.RealtimeSink;
import co.cask.cdap.template.etl.api.realtime.RealtimeSource;
import co.cask.cdap.template.etl.api.realtime.SourceState;
import co.cask.cdap.template.etl.common.DefaultEmitter;
import co.cask.cdap.template.etl.common.Destroyables;
import co.cask.cdap.template.etl.common.ETLStage;
import co.cask.cdap.template.etl.common.PluginID;
import co.cask.cdap.template.etl.common.StageMetrics;
import co.cask.cdap.template.etl.common.TransformExecutor;
import co.cask.cdap.template.etl.realtime.DefaultDataWriter;
import co.cask.cdap.template.etl.realtime.RealtimeTransformContext;
import co.cask.cdap.template.etl.realtime.TrackedRealtimeSink;
import co.cask.cdap.template.etl.realtime.WorkerRealtimeContext;
import co.cask.cdap.template.etl.realtime.config.ETLRealtimeConfig;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ETLWorker
extends AbstractWorker {
    private static final Logger LOG = LoggerFactory.getLogger(ETLWorker.class);
    private static final Type STRING_LIST_TYPE = new TypeToken<List<String>>(){}.getType();
    private static final Gson GSON = new Gson();
    private static final String SEPARATOR = ":";
    private String adapterName;
    private RealtimeSource source;
    private RealtimeSink sink;
    private List<Metrics> transformMetrics;
    private TransformExecutor transformExecutor;
    private DefaultEmitter sourceEmitter;
    private String stateStoreKey;
    private byte[] stateStoreKeyBytes;
    private Metrics metrics;
    private volatile boolean running;

    public void configure() {
        this.setName(ETLWorker.class.getSimpleName());
        this.setDescription("Worker Driver for Realtime ETL Adapters");
    }

    public void initialize(WorkerContext context) throws Exception {
        super.initialize(context);
        Map runtimeArgs = context.getRuntimeArguments();
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("name"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("config"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("sourceId"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("sinkId"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("transformIds"));
        Preconditions.checkArgument((boolean)runtimeArgs.containsKey("uniqueid"));
        this.adapterName = (String)runtimeArgs.get("name");
        this.stateStoreKey = String.format("%s%s%s%s%s", this.adapterName, SEPARATOR, runtimeArgs.get("uniqueid"), SEPARATOR, context.getInstanceId());
        this.stateStoreKeyBytes = Bytes.toBytes((String)this.stateStoreKey);
        ETLRealtimeConfig config = (ETLRealtimeConfig)((Object)GSON.fromJson((String)runtimeArgs.get("config"), ETLRealtimeConfig.class));
        this.getContext().execute(new TxRunnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run(DatasetContext dsContext) throws Exception {
                KeyValueTable stateTable = (KeyValueTable)dsContext.getDataset("etlrealtimesourcestate");
                byte[] startKey = Bytes.toBytes((String)String.format("%s%s", ETLWorker.this.adapterName, ETLWorker.SEPARATOR));
                try (CloseableIterator rows = stateTable.scan(startKey, Bytes.stopKeyForPrefix((byte[])startKey));){
                    while (rows.hasNext()) {
                        KeyValue row = (KeyValue)rows.next();
                        if (Bytes.compareTo((byte[])ETLWorker.this.stateStoreKeyBytes, (byte[])((byte[])row.getKey())) == 0) continue;
                        stateTable.delete((byte[])row.getKey());
                    }
                }
            }
        });
        this.initializeSource(context, config.getSource());
        List<Transformation> transforms = this.initializeTransforms(context, config.getTransforms());
        this.initializeSink(context, config.getSink());
        this.transformExecutor = new TransformExecutor(transforms, this.transformMetrics);
    }

    private void initializeSource(WorkerContext context, ETLStage stage) throws Exception {
        String sourcePluginId = (String)context.getRuntimeArguments().get("sourceId");
        this.source = (RealtimeSource)context.newPluginInstance(sourcePluginId);
        WorkerRealtimeContext sourceContext = new WorkerRealtimeContext(context, this.metrics, sourcePluginId);
        LOG.debug("Source Stage : {}", (Object)stage.getName());
        LOG.debug("Source Class : {}", (Object)this.source.getClass().getName());
        this.source.initialize(sourceContext);
        this.sourceEmitter = new DefaultEmitter((Metrics)new StageMetrics(this.metrics, PluginID.from((String)sourcePluginId)));
    }

    private void initializeSink(WorkerContext context, ETLStage stage) throws Exception {
        String sinkPluginId = (String)context.getRuntimeArguments().get("sinkId");
        this.sink = (RealtimeSink)context.newPluginInstance(sinkPluginId);
        WorkerRealtimeContext sinkContext = new WorkerRealtimeContext(context, this.metrics, sinkPluginId);
        LOG.debug("Sink Stage : {}", (Object)stage.getName());
        LOG.debug("Sink Class : {}", (Object)this.sink.getClass().getName());
        this.sink.initialize(sinkContext);
        this.sink = new TrackedRealtimeSink(this.sink, this.metrics, PluginID.from((String)sinkPluginId));
    }

    private List<Transformation> initializeTransforms(WorkerContext context, List<ETLStage> stages) throws Exception {
        List transformIds = (List)GSON.fromJson((String)context.getRuntimeArguments().get("transformIds"), STRING_LIST_TYPE);
        ArrayList transforms = Lists.newArrayList();
        Preconditions.checkArgument((transformIds != null ? 1 : 0) != 0);
        Preconditions.checkArgument((stages.size() == transformIds.size() ? 1 : 0) != 0);
        this.transformMetrics = Lists.newArrayListWithCapacity((int)stages.size());
        for (int i = 0; i < stages.size(); ++i) {
            ETLStage stage = stages.get(i);
            String transformId = (String)transformIds.get(i);
            try {
                Transform transform = (Transform)context.newPluginInstance(transformId);
                RealtimeTransformContext transformContext = new RealtimeTransformContext(context, this.metrics, transformId);
                LOG.debug("Transform Stage : {}", (Object)stage.getName());
                LOG.debug("Transform Class : {}", (Object)transform.getClass().getName());
                transform.initialize(transformContext);
                transforms.add(transform);
                this.transformMetrics.add((Metrics)new StageMetrics(this.metrics, PluginID.from((String)transformId)));
                continue;
            }
            catch (InstantiationException e) {
                LOG.error("Unable to instantiate Transform : {}", (Object)stage.getName(), (Object)e);
                Throwables.propagate((Throwable)e);
            }
        }
        return transforms;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        final SourceState currentState = new SourceState();
        final SourceState nextState = new SourceState();
        final ArrayList dataToSink = Lists.newArrayList();
        this.running = true;
        this.getContext().execute(new TxRunnable(){

            public void run(DatasetContext context) throws Exception {
                KeyValueTable stateTable = (KeyValueTable)context.getDataset("etlrealtimesourcestate");
                byte[] stateBytes = stateTable.read(ETLWorker.this.stateStoreKeyBytes);
                if (stateBytes != null) {
                    SourceState state = (SourceState)GSON.fromJson(Bytes.toString((byte[])stateBytes), SourceState.class);
                    currentState.setState(state);
                }
            }
        });
        while (this.running) {
            block12: {
                try {
                    SourceState newState = this.source.poll(this.sourceEmitter, new SourceState(currentState));
                    if (newState == null) break block12;
                    nextState.setState(newState);
                }
                catch (Exception e) {
                    LOG.warn("Adapter {} : Exception thrown during polling of Source for data", (Object)this.adapterName, (Object)e);
                    this.sourceEmitter.reset();
                    continue;
                }
            }
            for (Object sourceData : this.sourceEmitter) {
                try {
                    for (Object object : this.transformExecutor.runOneIteration(sourceData)) {
                        dataToSink.add(object);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Adapter {} : Exception thrown while processing data {}", new Object[]{this.adapterName, sourceData, e});
                }
            }
            this.sourceEmitter.reset();
            try {
                if (dataToSink.isEmpty() && nextState.equals(currentState)) continue;
                this.getContext().execute(new TxRunnable(){

                    public void run(DatasetContext context) throws Exception {
                        if (!dataToSink.isEmpty()) {
                            DefaultDataWriter defaultDataWriter = new DefaultDataWriter(ETLWorker.this.getContext(), context);
                            ETLWorker.this.sink.write(dataToSink, defaultDataWriter);
                        }
                        if (!nextState.equals(currentState)) {
                            KeyValueTable stateTable = (KeyValueTable)context.getDataset("etlrealtimesourcestate");
                            stateTable.write(ETLWorker.this.stateStoreKey, GSON.toJson((Object)nextState));
                        }
                    }
                });
                currentState.setState(nextState);
            }
            catch (Exception e) {
                LOG.warn("Adapter {} : Exception thrown during persisting of data", (Object)this.adapterName, (Object)e);
            }
            finally {
                dataToSink.clear();
            }
        }
    }

    public void stop() {
        this.running = false;
    }

    public void destroy() {
        Destroyables.destroyQuietly((Destroyable)this.source);
        Destroyables.destroyQuietly((Destroyable)this.transformExecutor);
        Destroyables.destroyQuietly((Destroyable)this.sink);
    }
}

