package com.ifesdjeen.cascading.cassandra;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.FieldsResolverException;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import com.ifesdjeen.cascading.cassandra.hadoop.CassandraHelper;
import com.ifesdjeen.cascading.cassandra.hadoop.ColumnFamilyInputFormat;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ifesdjeen/cascading/cassandra/CassandraScheme.class */
public class CassandraScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
    private static final Logger logger = LoggerFactory.getLogger(CassandraTap.class);
    private String pathUUID;
    private String host;
    private String port;
    private String keyspace;
    private String columnFamily;
    private String keyColumnName;
    private List<String> columnFieldNames;
    private Map<String, String> fieldMappings;
    private Map<String, String> settings;
    private CassandraHelper helper;

    public CassandraScheme(String str, String str2, String str3, String str4, String str5, List<String> list) {
        this(str, str2, str3, str4, str5, list, null);
    }

    public CassandraScheme(String str, String str2, String str3, String str4, String str5, List<String> list, Map<String, String> map) {
        this(str, str2, str3, str4, str5, list, map, null);
    }

    public CassandraScheme(String str, String str2, String str3, String str4, String str5, List<String> list, Map<String, String> map, Map<String, String> map2) {
        this.host = str;
        this.port = str2;
        this.keyspace = str3;
        this.columnFamily = str4;
        this.columnFieldNames = list;
        this.keyColumnName = str5;
        this.fieldMappings = map;
        this.settings = map2;
        this.pathUUID = UUID.randomUUID().toString();
    }

    public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
        this.helper = new CassandraHelper(this.host, Integer.valueOf(Integer.parseInt(this.port)), this.keyspace, this.columnFamily);
        sourceCall.setContext(new Object[]{ByteBufferUtil.clone((ByteBuffer) ((RecordReader) sourceCall.getInput()).createKey()), (SortedMap) ((RecordReader) sourceCall.getInput()).createValue()});
    }

    public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
        sourceCall.setContext((Object) null);
    }

    public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        Tuple tuple = new Tuple();
        Object obj = ((Object[]) sourceCall.getContext())[0];
        Object obj2 = ((Object[]) sourceCall.getContext())[1];
        ByteBuffer clone = ByteBufferUtil.clone((ByteBuffer) obj);
        if (!((RecordReader) sourceCall.getInput()).next(clone, obj2)) {
            return false;
        }
        SortedMap sortedMap = (SortedMap) obj2;
        tuple.add(ByteBufferUtil.string(clone).trim());
        if (this.columnFieldNames.isEmpty()) {
            tuple.add(sortedMap);
        } else {
            for (String str : this.columnFieldNames) {
                IColumn iColumn = (IColumn) sortedMap.get(ByteBufferUtil.bytes(str));
                if (iColumn != null) {
                    tuple.add(this.helper.getTypeForColumn(iColumn).compose(iColumn.value()));
                } else if (str != this.keyColumnName) {
                    tuple.add("");
                }
            }
        }
        sourceCall.getIncomingEntry().setTuple(tuple);
        return true;
    }

    public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        TupleEntry outgoingEntry = sinkCall.getOutgoingEntry();
        OutputCollector outputCollector = (OutputCollector) sinkCall.getOutput();
        logger.info("key name {}", this.keyColumnName);
        logger.info("key mapping name {}", this.fieldMappings.get(this.keyColumnName));
        ByteBuffer serialize = CassandraHelper.serialize(outgoingEntry.selectTuple(new Fields(new Comparable[]{this.fieldMappings.get(this.keyColumnName)})).get(0));
        ArrayList arrayList = new ArrayList(this.columnFieldNames.size());
        for (String str : this.columnFieldNames) {
            String str2 = this.fieldMappings.get(str);
            Comparable comparable = null;
            try {
                comparable = outgoingEntry.get(str2);
            } catch (FieldsResolverException e) {
                logger.error("Couldn't resolve field: {}", str);
            }
            if (comparable != null && str != this.keyColumnName) {
                logger.info("Column filed name {}", str);
                logger.info("Mapped column name {}", str2);
                logger.info("Column filed value {}", outgoingEntry.get(str2));
                arrayList.add(createColumnPutMutation(CassandraHelper.serialize(str), CassandraHelper.serialize(outgoingEntry.get(str2))));
            }
        }
        outputCollector.collect(serialize, arrayList);
    }

    protected Mutation createColumnPutMutation(ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        Column column = new Column(byteBuffer);
        column.setName(byteBuffer);
        column.setValue(byteBuffer2);
        column.setTimestamp(System.currentTimeMillis());
        Mutation mutation = new Mutation();
        ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
        columnOrSuperColumn.setColumn(column);
        mutation.setColumn_or_supercolumn(columnOrSuperColumn);
        return mutation;
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
        jobConf.setOutputFormat(ColumnFamilyOutputFormat.class);
        ConfigHelper.setRangeBatchSize(jobConf, 100);
        ConfigHelper.setOutputRpcPort(jobConf, this.port);
        ConfigHelper.setOutputInitialAddress(jobConf, this.host);
        if (this.settings.containsKey("cassandra.outputPartitioner")) {
            ConfigHelper.setOutputPartitioner(jobConf, this.settings.get("cassandra.outputPartitioner"));
        } else {
            ConfigHelper.setOutputPartitioner(jobConf, "org.apache.cassandra.dht.RandomPartitioner");
        }
        ConfigHelper.setOutputColumnFamily(jobConf, this.keyspace, this.columnFamily);
        jobConf.setInt(ColumnFamilyInputFormat.CASSANDRA_HADOOP_MAX_KEY_SIZE, 60);
        FileOutputFormat.setOutputPath(jobConf, getPath());
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
        FileInputFormat.addInputPaths(jobConf, getPath().toString());
        jobConf.setInputFormat(ColumnFamilyInputFormat.class);
        ConfigHelper.setRangeBatchSize(jobConf, 100);
        ConfigHelper.setInputSplitSize(jobConf, 30);
        ConfigHelper.setInputRpcPort(jobConf, this.port);
        ConfigHelper.setInputInitialAddress(jobConf, this.host);
        if (this.settings.containsKey("cassandra.inputPartitioner")) {
            ConfigHelper.setInputPartitioner(jobConf, this.settings.get("cassandra.inputPartitioner"));
        } else {
            ConfigHelper.setInputPartitioner(jobConf, "org.apache.cassandra.dht.RandomPartitioner");
        }
        ConfigHelper.setInputColumnFamily(jobConf, this.keyspace, this.columnFamily);
        jobConf.setInt(ColumnFamilyInputFormat.CASSANDRA_HADOOP_MAX_KEY_SIZE, 60);
        SlicePredicate slicePredicate = new SlicePredicate();
        if (this.columnFieldNames.isEmpty()) {
            SliceRange sliceRange = new SliceRange();
            sliceRange.setStart(ByteBufferUtil.bytes(""));
            sliceRange.setFinish(ByteBufferUtil.bytes(""));
            slicePredicate.setSlice_range(sliceRange);
        } else {
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = this.columnFieldNames.iterator();
            while (it.hasNext()) {
                arrayList.add(ByteBufferUtil.bytes(it.next()));
            }
            slicePredicate.setColumn_names(arrayList);
        }
        ConfigHelper.setInputSlicePredicate(jobConf, slicePredicate);
    }

    public Path getPath() {
        return new Path(this.pathUUID);
    }

    public String getIdentifier() {
        return this.host + "_" + this.port + "_" + this.keyspace + "_" + this.columnFamily;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return (obj instanceof CassandraScheme) && super.equals(obj) && getPath().toString().equals(((CassandraScheme) obj).getPath().toString());
    }

    public int hashCode() {
        return (31 * ((31 * ((31 * ((31 * ((31 * super.hashCode()) + getPath().toString().hashCode())) + (this.host != null ? this.host.hashCode() : 0))) + (this.port != null ? this.port.hashCode() : 0))) + (this.keyspace != null ? this.keyspace.hashCode() : 0))) + (this.columnFamily != null ? this.columnFamily.hashCode() : 0);
    }

    public /* bridge */ /* synthetic */ void sinkConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sinkConfInit((FlowProcess<JobConf>) flowProcess, (Tap<JobConf, RecordReader, OutputCollector>) tap, (JobConf) obj);
    }

    public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sourceConfInit((FlowProcess<JobConf>) flowProcess, (Tap<JobConf, RecordReader, OutputCollector>) tap, (JobConf) obj);
    }
}
