/*
 * Decompiled with CFR 0.152.
 */
package cascading.local.tap.kafka;

import cascading.flow.FlowProcess;
import cascading.local.tap.kafka.KafkaConsumerRecordIterator;
import cascading.local.tap.kafka.KafkaScheme;
import cascading.local.tap.kafka.KafkaTap;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class TextKafkaScheme
extends KafkaScheme<String, String, Context, Context> {
    public static final Fields TOPIC_FIELDS = new Fields((Comparable)((Object)"topic"), String.class);
    public static final Fields PARTITION_FIELDS = new Fields((Comparable)((Object)"partition"), Integer.TYPE);
    public static final Fields OFFSET_FIELDS = new Fields((Comparable)((Object)"offset"), Long.TYPE);
    public static final Fields KEY_FIELDS = new Fields((Comparable)((Object)"key"), String.class);
    public static final Fields VALUE_FIELDS = new Fields((Comparable)((Object)"value"), String.class);
    public static final Fields TIMESTAMP_FIELDS = new Fields((Comparable)((Object)"timestamp"), Long.TYPE);
    public static final Fields TIMESTAMP_TYPE_FIELDS = new Fields((Comparable)((Object)"timestampType"), String.class);
    public static final Fields DEFAULT_SOURCE_FIELDS = TOPIC_FIELDS.append(PARTITION_FIELDS).append(OFFSET_FIELDS).append(KEY_FIELDS).append(VALUE_FIELDS).append(TIMESTAMP_FIELDS).append(TIMESTAMP_TYPE_FIELDS);

    public TextKafkaScheme() {
        super(DEFAULT_SOURCE_FIELDS);
    }

    public TextKafkaScheme(Fields sourceFields) {
        super(sourceFields);
        if (sourceFields.size() != 7) {
            throw new IllegalArgumentException("wrong number of source fields, requires 6, got: " + sourceFields);
        }
    }

    public void sourceConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, KafkaConsumerRecordIterator<String, String>, Producer<String, String>> tap, Properties conf) {
        conf.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        conf.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
    }

    public void sinkConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, KafkaConsumerRecordIterator<String, String>, Producer<String, String>> tap, Properties conf) {
        conf.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
        conf.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
    }

    public void sourcePrepare(FlowProcess<? extends Properties> flowProcess, SourceCall<Context, KafkaConsumerRecordIterator<String, String>> sourceCall) throws IOException {
        sourceCall.setContext((Object)new Context(((KafkaTap)sourceCall.getTap()).getTopics()));
    }

    public void sinkPrepare(FlowProcess<? extends Properties> flowProcess, SinkCall<Context, Producer<String, String>> sinkCall) throws IOException {
        sinkCall.setContext((Object)new Context(((KafkaTap)sinkCall.getTap()).getTopics()));
    }

    public boolean source(FlowProcess<? extends Properties> flowProcess, SourceCall<Context, KafkaConsumerRecordIterator<String, String>> sourceCall) throws IOException {
        Iterator input = (Iterator)sourceCall.getInput();
        if (!input.hasNext()) {
            return false;
        }
        ConsumerRecord record = (ConsumerRecord)input.next();
        TupleEntry incomingEntry = sourceCall.getIncomingEntry();
        incomingEntry.setObject((Comparable)Integer.valueOf(0), (Object)record.topic());
        incomingEntry.setObject((Comparable)Integer.valueOf(1), (Object)record.partition());
        incomingEntry.setObject((Comparable)Integer.valueOf(2), (Object)record.offset());
        incomingEntry.setObject((Comparable)Integer.valueOf(3), record.key());
        incomingEntry.setObject((Comparable)Integer.valueOf(4), record.value());
        incomingEntry.setObject((Comparable)Integer.valueOf(5), (Object)record.timestamp());
        incomingEntry.setObject((Comparable)Integer.valueOf(6), (Object)record.timestampType());
        return true;
    }

    public void sink(FlowProcess<? extends Properties> flowProcess, SinkCall<Context, Producer<String, String>> sinkCall) throws IOException {
        String key = sinkCall.getOutgoingEntry().getString((Comparable)Integer.valueOf(0));
        String value = sinkCall.getOutgoingEntry().getString((Comparable)Integer.valueOf(1));
        for (String topic : ((Context)sinkCall.getContext()).topics) {
            ((Producer)sinkCall.getOutput()).send(new ProducerRecord(topic, (Object)key, (Object)value));
        }
    }

    class Context {
        String[] topics;

        public Context(String[] topics) {
            this.topics = topics;
        }
    }
}

