/*
 * Decompiled with CFR 0.152.
 */
package kafka.bridge.pig;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import kafka.bridge.hadoop.KafkaOutputFormat;
import kafka.bridge.hadoop.KafkaRecordWriter;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;

public class AvroKafkaStorage
extends StoreFunc {
    protected KafkaRecordWriter<Object, byte[]> writer;
    protected Schema avroSchema;
    protected PigAvroDatumWriter datumWriter;
    protected Encoder encoder;
    protected ByteArrayOutputStream os;

    public AvroKafkaStorage(String schema) {
        this.avroSchema = Schema.parse((String)schema);
    }

    public OutputFormat getOutputFormat() throws IOException {
        return new KafkaOutputFormat();
    }

    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
        return location;
    }

    public void setStoreLocation(String uri, Job job) throws IOException {
        KafkaOutputFormat.setOutputPath(job, new Path(uri));
    }

    public void prepareToWrite(RecordWriter writer) throws IOException {
        if (this.avroSchema == null) {
            throw new IllegalStateException("avroSchema shouldn't be null");
        }
        this.writer = (KafkaRecordWriter)writer;
        this.datumWriter = new PigAvroDatumWriter(this.avroSchema);
        this.os = new ByteArrayOutputStream();
        this.encoder = new BinaryEncoder((OutputStream)this.os);
    }

    public void cleanupOnFailure(String location, Job job) throws IOException {
    }

    public void setStoreFuncUDFContextSignature(String signature) {
    }

    public void checkSchema(ResourceSchema schema) throws IOException {
        this.avroSchema = PigSchema2Avro.validateAndConvert((Schema)this.avroSchema, (ResourceSchema)schema);
    }

    protected void writeEnvelope(OutputStream os, Encoder enc) throws IOException {
    }

    public void putNext(Tuple tuple) throws IOException {
        this.os.reset();
        this.writeEnvelope(this.os, this.encoder);
        this.datumWriter.write((Object)tuple, this.encoder);
        this.encoder.flush();
        try {
            this.writer.write(null, this.os.toByteArray());
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
}

