/*
 * Decompiled with CFR 0.152.
 */
package co.cask.tephra.persist;

import co.cask.tephra.metrics.MetricsCollector;
import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.AbstractTransactionLog;
import co.cask.tephra.persist.CommitMarkerCodec;
import co.cask.tephra.persist.HDFSTransactionLog;
import co.cask.tephra.persist.TransactionEdit;
import co.cask.tephra.persist.TransactionLogReader;
import co.cask.tephra.util.TransactionEditUtil;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class HDFSTransactionLogTest {
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static final String LOG_FILE_PREFIX = "txlog.";
    private static MiniDFSCluster dfsCluster;
    private static Configuration conf;
    private static MetricsCollector metricsCollector;

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        Configuration hConf = new Configuration();
        hConf.set("hdfs.minidfs.basedir", TMP_FOLDER.newFolder().getAbsolutePath());
        dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
        conf = new Configuration(dfsCluster.getFileSystem().getConf());
        metricsCollector = new TxMetricsCollector();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        dfsCluster.shutdown();
    }

    private Configuration getConfiguration() throws IOException {
        conf.unset("data.tx.hdfs.user");
        conf.set("data.tx.snapshot.dir", TMP_FOLDER.newFolder().getAbsolutePath());
        return conf;
    }

    private HDFSTransactionLog getHDFSTransactionLog(Configuration conf, FileSystem fs, long timeInMillis) throws Exception {
        String snapshotDir = conf.get("data.tx.snapshot.dir");
        Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
        return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector);
    }

    private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs, long timeInMillis, boolean withMarker) throws IOException {
        String snapshotDir = configuration.get("data.tx.snapshot.dir");
        Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
        SequenceFile.Metadata metadata = new SequenceFile.Metadata();
        if (withMarker) {
            metadata.set(new Text("version"), new Text(Byte.toString((byte)2)));
        }
        return SequenceFile.createWriter((FileSystem)fs, (Configuration)configuration, (Path)newLog, LongWritable.class, TransactionEdit.class, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE, null, null, (SequenceFile.Metadata)metadata);
    }

    private void writeNumWrites(SequenceFile.Writer writer, int size) throws Exception {
        String key = "count";
        CommitMarkerCodec.writeMarker((SequenceFile.Writer)writer, (int)size);
    }

    private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete) throws Exception {
        AbstractTransactionLog.Entry entry;
        int i;
        List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
        long timestamp = System.currentTimeMillis();
        Configuration configuration = this.getConfiguration();
        FileSystem fs = FileSystem.newInstance((URI)FileSystem.getDefaultUri((Configuration)configuration), (Configuration)configuration);
        SequenceFile.Writer writer = this.getSequenceFileWriter(configuration, fs, timestamp, withMarker);
        AtomicLong logSequence = new AtomicLong();
        HDFSTransactionLog transactionLog = this.getHDFSTransactionLog(configuration, fs, timestamp);
        for (i = 0; i < totalCount - batchSize; i += batchSize) {
            if (withMarker) {
                this.writeNumWrites(writer, batchSize);
            }
            for (int j = 0; j < batchSize; ++j) {
                entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
                writer.append((Writable)entry.getKey(), (Writable)entry.getEdit());
            }
            writer.syncFs();
        }
        if (withMarker) {
            this.writeNumWrites(writer, batchSize);
        }
        for (i = totalCount - batchSize; i < totalCount - 1; ++i) {
            entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
            writer.append((Writable)entry.getKey(), (Writable)entry.getEdit());
        }
        entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(totalCount - 1));
        if (isComplete) {
            writer.append((Writable)entry.getKey(), (Writable)entry.getEdit());
        } else {
            byte[] bytes = Longs.toByteArray((long)entry.getKey().get());
            writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes(){

                public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
                    byte[] test = new byte[]{2};
                    outStream.write(test, 0, 1);
                }

                public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
                }

                public int getSize() {
                    return 12;
                }
            });
        }
        writer.syncFs();
        Closeables.closeQuietly((Closeable)writer);
        TransactionLogReader reader = transactionLog.getReader();
        int syncedEdits = 0;
        while (reader.next() != null) {
            ++syncedEdits;
        }
        if (isComplete) {
            Assert.assertEquals((long)totalCount, (long)syncedEdits);
        } else {
            Assert.assertEquals((long)(totalCount - batchSize), (long)syncedEdits);
        }
    }

    @Test
    public void testTransactionLogNewVersion() throws Exception {
        this.testTransactionLogSync(1000, 1, true, false);
        this.testTransactionLogSync(2000, 5, true, false);
        this.testTransactionLogSync(1000, 1, true, true);
        this.testTransactionLogSync(2000, 5, true, true);
    }

    @Test
    public void testTransactionLogOldVersion() throws Exception {
        this.testTransactionLogSync(1000, 1, false, false);
        this.testTransactionLogSync(2000, 5, false, true);
    }
}

