/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kudu;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.PojoUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.kudu.ApexKuduConnection;
import org.apache.apex.malhar.kudu.KuduExecutionContext;
import org.apache.apex.malhar.kudu.KuduMutationType;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Statistics;
import org.apache.kudu.client.Update;
import org.apache.kudu.client.Upsert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractKuduOutputOperator
extends BaseOperator
implements Operator.ActivationListener<Context.OperatorContext>,
Operator.CheckpointNotificationListener {
    private transient ApexKuduConnection apexKuduConnection;
    private transient KuduTable kuduTable;
    private transient KuduSession kuduSession;
    private transient KuduClient kuduClientHandle;
    private transient Map<String, ColumnSchema> allColumnDefs;
    private transient Map<String, Object> kuduColumnBasedGetters;
    private Set<String> primaryKeyColumnNames;
    private static final transient Logger LOG = LoggerFactory.getLogger(AbstractKuduOutputOperator.class);
    @NotNull
    protected WindowDataManager windowDataManager;
    private transient long currentWindowId;
    private transient boolean isInReplayMode;
    private transient boolean isInReconcilingMode;
    private transient long reconcilingWindowId;
    @AutoMetric
    transient long numInserts;
    @AutoMetric
    transient long numUpserts;
    @AutoMetric
    transient long numDeletes;
    @AutoMetric
    transient long numUpdates;
    @AutoMetric
    transient long numOpsErrors;
    @AutoMetric
    transient long numBytesWritten;
    @AutoMetric
    transient long numRpcErrors;
    @AutoMetric
    transient long numWriteOps;
    @AutoMetric
    transient long numWriteRPCs;
    @AutoMetric
    long totalOpsErrors = 0L;
    @AutoMetric
    long totalBytesWritten = 0L;
    @AutoMetric
    long totalRpcErrors = 0L;
    @AutoMetric
    long totalWriteOps = 0L;
    @AutoMetric
    long totalWriteRPCs = 0L;
    @AutoMetric
    long totalInsertsSinceStart;
    @AutoMetric
    long totalUpsertsSinceStart;
    @AutoMetric
    long totalDeletesSinceStart;
    @AutoMetric
    long totalUpdatesSinceStart;
    public final transient DefaultInputPort<KuduExecutionContext> input = new DefaultInputPort<KuduExecutionContext>(){

        public void process(KuduExecutionContext kuduExecutionContext) {
            AbstractKuduOutputOperator.this.processTuple(kuduExecutionContext);
        }
    };

    public void processTuple(KuduExecutionContext kuduExecutionContext) {
        if ((this.isInReconcilingMode || this.isInReplayMode) && !this.isEligibleForPassivationInReconcilingWindow(kuduExecutionContext, this.currentWindowId)) {
            return;
        }
        KuduMutationType mutationType = kuduExecutionContext.getMutationType();
        switch (mutationType) {
            case DELETE: {
                this.processForDelete(kuduExecutionContext);
                ++this.numDeletes;
                ++this.totalDeletesSinceStart;
                break;
            }
            case INSERT: {
                this.processForInsert(kuduExecutionContext);
                ++this.numInserts;
                ++this.totalInsertsSinceStart;
                break;
            }
            case UPDATE: {
                this.processForUpdate(kuduExecutionContext);
                ++this.numUpdates;
                ++this.totalUpdatesSinceStart;
                break;
            }
            case UPSERT: {
                this.processForUpsert(kuduExecutionContext);
                ++this.numUpserts;
                ++this.totalUpsertsSinceStart;
                break;
            }
        }
    }

    private void performCommonProcessing(Operation currentOperation, KuduExecutionContext kuduExecutionContext) {
        currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode());
        Long propagatedTimeStamp = kuduExecutionContext.getPropagatedTimestamp();
        if (propagatedTimeStamp != null) {
            currentOperation.setPropagatedTimestamp(propagatedTimeStamp.longValue());
        }
        PartialRow partialRow = currentOperation.getRow();
        Object payload = kuduExecutionContext.getPayload();
        Set<String> doNotWriteColumns = kuduExecutionContext.getDoNotWriteColumns();
        if (doNotWriteColumns == null) {
            doNotWriteColumns = new HashSet<String>();
        }
        block15: for (String columnName : this.kuduColumnBasedGetters.keySet()) {
            if (doNotWriteColumns.contains(columnName)) continue;
            ColumnSchema columnSchema = this.allColumnDefs.get(columnName);
            Type dataType = columnSchema.getType();
            try {
                switch (dataType) {
                    case STRING: {
                        String stringValue;
                        PojoUtils.Getter stringGetter = (PojoUtils.Getter)this.kuduColumnBasedGetters.get(columnName);
                        if (stringGetter == null || (stringValue = (String)stringGetter.get(payload)) == null) continue block15;
                        partialRow.addString(columnName, stringValue);
                        break;
                    }
                    case BINARY: {
                        ByteBuffer byteBufferValue;
                        PojoUtils.Getter byteBufferGetter = (PojoUtils.Getter)this.kuduColumnBasedGetters.get(columnName);
                        if (byteBufferGetter == null || (byteBufferValue = (ByteBuffer)byteBufferGetter.get(payload)) == null) continue block15;
                        partialRow.addBinary(columnName, byteBufferValue);
                        break;
                    }
                    case BOOL: {
                        PojoUtils.GetterBoolean boolGetter = (PojoUtils.GetterBoolean)this.kuduColumnBasedGetters.get(columnName);
                        if (boolGetter == null) continue block15;
                        boolean boolValue = boolGetter.get(payload);
                        partialRow.addBoolean(columnName, boolValue);
                        break;
                    }
                    case DOUBLE: {
                        PojoUtils.GetterDouble doubleGetter = (PojoUtils.GetterDouble)this.kuduColumnBasedGetters.get(columnName);
                        if (doubleGetter == null) continue block15;
                        double doubleValue = doubleGetter.get(payload);
                        partialRow.addDouble(columnName, doubleValue);
                        break;
                    }
                    case FLOAT: {
                        PojoUtils.GetterFloat floatGetter = (PojoUtils.GetterFloat)this.kuduColumnBasedGetters.get(columnName);
                        if (floatGetter == null) continue block15;
                        float floatValue = floatGetter.get(payload);
                        partialRow.addFloat(columnName, floatValue);
                        break;
                    }
                    case INT8: {
                        PojoUtils.GetterByte byteGetter = (PojoUtils.GetterByte)this.kuduColumnBasedGetters.get(columnName);
                        if (byteGetter == null) continue block15;
                        byte byteValue = byteGetter.get(payload);
                        partialRow.addByte(columnName, byteValue);
                        break;
                    }
                    case INT16: {
                        PojoUtils.GetterShort shortGetter = (PojoUtils.GetterShort)this.kuduColumnBasedGetters.get(columnName);
                        if (shortGetter == null) continue block15;
                        short shortValue = shortGetter.get(payload);
                        partialRow.addShort(columnName, shortValue);
                        break;
                    }
                    case INT32: {
                        PojoUtils.GetterInt intGetter = (PojoUtils.GetterInt)this.kuduColumnBasedGetters.get(columnName);
                        if (intGetter == null) continue block15;
                        int intValue = intGetter.get(payload);
                        partialRow.addInt(columnName, intValue);
                        break;
                    }
                    case INT64: 
                    case UNIXTIME_MICROS: {
                        PojoUtils.GetterLong longGetter = (PojoUtils.GetterLong)this.kuduColumnBasedGetters.get(columnName);
                        if (longGetter == null) continue block15;
                        long longValue = longGetter.get(payload);
                        partialRow.addLong(columnName, longValue);
                        break;
                    }
                    default: {
                        LOG.error(columnName + " is not of the supported data type");
                        throw new UnsupportedOperationException("Kudu does not support data type for column " + columnName);
                    }
                }
            }
            catch (Exception ex) {
                LOG.error(" Exception while fetching the value of " + columnName + " because " + ex.getMessage());
                partialRow.setNull(columnName);
            }
        }
        try {
            this.kuduSession.apply(currentOperation);
        }
        catch (KuduException e) {
            throw new RuntimeException("Could not execute operation because " + e.getMessage(), e);
        }
    }

    protected void processForUpdate(KuduExecutionContext kuduExecutionContext) {
        Update thisUpdate = this.kuduTable.newUpdate();
        this.performCommonProcessing((Operation)thisUpdate, kuduExecutionContext);
    }

    protected void processForUpsert(KuduExecutionContext kuduExecutionContext) {
        Upsert thisUpsert = this.kuduTable.newUpsert();
        this.performCommonProcessing((Operation)thisUpsert, kuduExecutionContext);
    }

    protected void processForDelete(KuduExecutionContext kuduExecutionContext) {
        Delete thisDelete = this.kuduTable.newDelete();
        Set<String> doNotWriteCols = kuduExecutionContext.getDoNotWriteColumns();
        if (doNotWriteCols == null) {
            doNotWriteCols = new HashSet<String>();
        }
        doNotWriteCols.clear();
        for (String columnName : this.allColumnDefs.keySet()) {
            if (this.primaryKeyColumnNames.contains(columnName)) continue;
            doNotWriteCols.add(columnName);
        }
        kuduExecutionContext.setDoNotWriteColumns(doNotWriteCols);
        this.performCommonProcessing((Operation)thisDelete, kuduExecutionContext);
    }

    protected void processForInsert(KuduExecutionContext kuduExecutionContext) {
        Insert thisInsert = this.kuduTable.newInsert();
        this.performCommonProcessing((Operation)thisInsert, kuduExecutionContext);
    }

    public void activate(Context.OperatorContext context) {
        ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder = this.getKuduConnectionConfig();
        this.apexKuduConnection = apexKuduConnectionBuilder.build();
        Preconditions.checkNotNull((Object)this.apexKuduConnection, (Object)"Kudu connection cannot be null");
        this.kuduTable = this.apexKuduConnection.getKuduTable();
        this.kuduSession = this.apexKuduConnection.getKuduSession();
        this.kuduClientHandle = this.apexKuduConnection.getKuduClient();
        Preconditions.checkNotNull((Object)this.kuduTable, (Object)"Kudu Table cannot be null");
        Preconditions.checkNotNull((Object)this.kuduSession, (Object)"Kudu session cannot be null");
        this.allColumnDefs = new HashMap<String, ColumnSchema>();
        this.primaryKeyColumnNames = new HashSet<String>();
        this.kuduColumnBasedGetters = new HashMap<String, Object>();
        this.buildGettersForPojoPayload();
        this.reconcilingWindowId = -1L;
        if ((Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != -1L && (Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < this.windowDataManager.getLargestCompletedWindow()) {
            this.reconcilingWindowId = this.windowDataManager.getLargestCompletedWindow() + 1L;
        }
        if ((Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != -1L && ((Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)).longValue() == this.windowDataManager.getLargestCompletedWindow()) {
            this.reconcilingWindowId = this.windowDataManager.getLargestCompletedWindow();
        }
    }

    public void deactivate() {
        try {
            this.apexKuduConnection.close();
        }
        catch (Exception e) {
            LOG.error("Could not close kudu session and resources because " + e.getMessage(), (Throwable)e);
        }
    }

    public void beforeCheckpoint(long l) {
    }

    public void checkpointed(long l) {
    }

    public void committed(long windowId) {
        try {
            this.windowDataManager.committed(windowId);
        }
        catch (IOException e) {
            throw new RuntimeException("Error while committing the window id " + windowId + " because " + e.getMessage(), e);
        }
    }

    public void setup(Context.OperatorContext context) {
        super.setup(context);
        this.windowDataManager = this.getWindowDataManager();
        if (this.windowDataManager == null) {
            this.windowDataManager = new FSWindowDataManager();
        }
        this.windowDataManager.setup((Context)context);
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        this.currentWindowId = windowId;
        if (this.currentWindowId != -1L) {
            if (this.currentWindowId > this.reconcilingWindowId) {
                this.isInReplayMode = false;
                this.isInReconcilingMode = false;
            }
            if (this.currentWindowId == this.reconcilingWindowId) {
                this.isInReconcilingMode = true;
                this.isInReplayMode = false;
            }
            if (this.currentWindowId < this.reconcilingWindowId) {
                this.isInReconcilingMode = false;
                this.isInReplayMode = true;
            }
        }
        this.numDeletes = 0L;
        this.numInserts = 0L;
        this.numUpdates = 0L;
        this.numUpserts = 0L;
    }

    public void endWindow() {
        try {
            this.kuduSession.flush();
        }
        catch (KuduException e) {
            throw new RuntimeException("Could not flush kudu session on an end window boundary " + e.getMessage(), e);
        }
        if (this.currentWindowId > this.windowDataManager.getLargestCompletedWindow()) {
            try {
                this.windowDataManager.save((Object)this.currentWindowId, this.currentWindowId);
            }
            catch (IOException e) {
                throw new RuntimeException("Error while persisting the current window state " + this.currentWindowId + " because " + e.getMessage(), e);
            }
        }
        this.numOpsErrors = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS) - this.totalOpsErrors;
        this.numBytesWritten = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN) - this.totalBytesWritten;
        this.numRpcErrors = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS) - this.totalRpcErrors;
        this.numWriteOps = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS) - this.totalWriteOps;
        this.numWriteRPCs = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS) - this.totalWriteOps;
        this.totalOpsErrors = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS);
        this.totalBytesWritten = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN);
        this.totalRpcErrors = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS);
        this.totalWriteOps = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS);
        this.totalWriteRPCs = this.kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS);
    }

    private void buildGettersForPojoPayload() {
        Class payloadClass = this.getTuplePayloadClass();
        Preconditions.checkNotNull((Object)payloadClass, (Object)"Payload class cannot be null");
        Field[] classFields = payloadClass.getDeclaredFields();
        Schema schemaInfo = this.kuduTable.getSchema();
        List allColumns = schemaInfo.getColumns();
        HashSet<String> allKuduTableColumnNames = new HashSet<String>();
        HashMap<String, ColumnSchema> normalizedColumns = new HashMap<String, ColumnSchema>();
        for (ColumnSchema aColumnDef : allColumns) {
            this.allColumnDefs.put(aColumnDef.getName(), aColumnDef);
            normalizedColumns.put(aColumnDef.getName().toLowerCase(), aColumnDef);
            allKuduTableColumnNames.add(aColumnDef.getName().toLowerCase());
        }
        List primaryKeyColumns = schemaInfo.getPrimaryKeyColumns();
        for (ColumnSchema primaryKeyInfo : primaryKeyColumns) {
            this.primaryKeyColumnNames.add(primaryKeyInfo.getName());
        }
        Map<String, String> columnNameOverrides = this.getOverridingColumnNameMap();
        if (columnNameOverrides == null) {
            columnNameOverrides = new HashMap<String, String>();
        }
        for (Field aFieldDef : classFields) {
            String currentFieldName = aFieldDef.getName().toLowerCase();
            if (allKuduTableColumnNames.contains(currentFieldName)) {
                this.extractGetterForColumn((ColumnSchema)normalizedColumns.get(currentFieldName), aFieldDef);
                continue;
            }
            if (columnNameOverrides.containsKey(aFieldDef.getName())) {
                this.extractGetterForColumn((ColumnSchema)normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName()).toLowerCase()), aFieldDef);
                continue;
            }
            if (!columnNameOverrides.containsKey(aFieldDef.getName().toLowerCase())) continue;
            this.extractGetterForColumn((ColumnSchema)normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName().toLowerCase()).toLowerCase()), aFieldDef);
        }
    }

    private void extractGetterForColumn(ColumnSchema columnSchema, Field fieldDefinition) {
        Type columnType = columnSchema.getType();
        Class pojoClass = this.getTuplePayloadClass();
        PojoUtils.GetterBoolean getter = null;
        switch (columnType) {
            case BINARY: {
                getter = PojoUtils.createGetter((Class)pojoClass, (String)fieldDefinition.getName(), ByteBuffer.class);
                break;
            }
            case STRING: {
                getter = PojoUtils.createGetter((Class)pojoClass, (String)fieldDefinition.getName(), String.class);
                break;
            }
            case BOOL: {
                getter = PojoUtils.createGetterBoolean((Class)pojoClass, (String)fieldDefinition.getName());
                break;
            }
            case DOUBLE: {
                getter = PojoUtils.createGetterDouble((Class)pojoClass, (String)fieldDefinition.getName());
                break;
            }
            case FLOAT: {
                getter = PojoUtils.createGetterFloat((Class)pojoClass, (String)fieldDefinition.getName());
                break;
            }
            case INT8: {
                getter = PojoUtils.createGetterByte((Class)pojoClass, (String)fieldDefinition.getName());
                break;
            }
            case INT16: {
                getter = PojoUtils.createGetterShort((Class)pojoClass, (String)fieldDefinition.getName());
                break;
            }
            case INT32: {
                getter = PojoUtils.createGetterInt((Class)pojoClass, (String)fieldDefinition.getName());
                break;
            }
            case INT64: 
            case UNIXTIME_MICROS: {
                getter = PojoUtils.createGetterLong((Class)pojoClass, (String)fieldDefinition.getName());
                break;
            }
            default: {
                LOG.error(fieldDefinition.getName() + " has a data type that is not yet supported");
                throw new UnsupportedOperationException(fieldDefinition.getName() + " does not have a compatible data type");
            }
        }
        if (getter != null) {
            this.kuduColumnBasedGetters.put(columnSchema.getName(), getter);
        }
    }

    public static ApexKuduConnection.ApexKuduConnectionBuilder usingKuduConnectionBuilder() {
        return new ApexKuduConnection.ApexKuduConnectionBuilder();
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    protected Map<String, String> getOverridingColumnNameMap() {
        return new HashMap<String, String>();
    }

    abstract ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig();

    protected abstract Class getTuplePayloadClass();

    protected abstract boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext var1, long var2);
}

