/*
 * Decompiled with CFR 0.152.
 */
package co.cask.tephra.persist;

import co.cask.tephra.ChangeId;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.AbstractTransactionStateStorage;
import co.cask.tephra.persist.TransactionEdit;
import co.cask.tephra.persist.TransactionLog;
import co.cask.tephra.persist.TransactionLogReader;
import co.cask.tephra.persist.TransactionSnapshot;
import co.cask.tephra.persist.TransactionStateStorage;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTransactionStateStorageTest {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class);
    private static Random random = new Random();

    protected abstract Configuration getConfiguration(String var1) throws IOException;

    protected abstract AbstractTransactionStateStorage getStorage(Configuration var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotPersistence() throws Exception {
        Configuration conf = this.getConfiguration("testSnapshotPersistence");
        TransactionSnapshot snapshot = this.createRandomSnapshot();
        AbstractTransactionStateStorage storage = this.getStorage(conf);
        try {
            storage.startAndWait();
            storage.writeSnapshot(snapshot);
            TransactionSnapshot readSnapshot = storage.getLatestSnapshot();
            Assert.assertNotNull((Object)readSnapshot);
            Assert.assertEquals((Object)snapshot, (Object)readSnapshot);
        }
        finally {
            storage.stopAndWait();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLogWriteAndRead() throws Exception {
        Configuration conf = this.getConfiguration("testLogWriteAndRead");
        List<TransactionEdit> edits = this.createRandomEdits(100);
        AbstractTransactionStateStorage storage = this.getStorage(conf);
        try {
            TransactionEdit nextEdit;
            long now = System.currentTimeMillis();
            storage.startAndWait();
            TransactionLog log = storage.createLog(now);
            for (TransactionEdit edit : edits) {
                log.append(edit);
            }
            log.close();
            List logsToRead = storage.getLogsSince(now);
            Assert.assertNotNull((Object)logsToRead);
            Assert.assertEquals((long)1L, (long)logsToRead.size());
            TransactionLogReader logReader = ((TransactionLog)logsToRead.iterator().next()).getReader();
            Assert.assertNotNull((Object)logReader);
            ArrayList readEdits = Lists.newArrayListWithExpectedSize((int)edits.size());
            while ((nextEdit = logReader.next()) != null) {
                readEdits.add(nextEdit);
            }
            logReader.close();
            Assert.assertEquals((long)edits.size(), (long)readEdits.size());
            for (int i = 0; i < edits.size(); ++i) {
                LOG.info("Checking edit " + i);
                Assert.assertEquals((Object)edits.get(i), readEdits.get(i));
            }
        }
        finally {
            storage.stopAndWait();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionManagerPersistence() throws Exception {
        Configuration conf = this.getConfiguration("testTransactionManagerPersistence");
        conf.setInt("data.tx.cleanup.interval", 0);
        conf.setInt("data.tx.snapshot.interval", 600);
        AbstractTransactionStateStorage storage = null;
        AbstractTransactionStateStorage storage2 = null;
        AbstractTransactionStateStorage storage3 = null;
        try {
            storage = this.getStorage(conf);
            TransactionManager txManager = new TransactionManager(conf, (TransactionStateStorage)storage, new TxMetricsCollector());
            txManager.startAndWait();
            byte[] a = new byte[]{97};
            byte[] b = new byte[]{98};
            Transaction tx1 = txManager.startShort();
            Assert.assertTrue((boolean)txManager.canCommit(tx1, Collections.singleton(a)));
            Assert.assertTrue((boolean)txManager.commit(tx1));
            Transaction tx2 = txManager.startShort();
            Assert.assertTrue((boolean)txManager.canCommit(tx2, Collections.singleton(b)));
            Transaction tx3 = txManager.startShort();
            txManager.stopAndWait();
            TransactionSnapshot origState = txManager.getCurrentState();
            LOG.info("Orig state: " + origState);
            Thread.sleep(100L);
            storage2 = this.getStorage(conf);
            txManager = new TransactionManager(conf, (TransactionStateStorage)storage2, new TxMetricsCollector());
            txManager.startAndWait();
            TransactionSnapshot newState = txManager.getCurrentState();
            LOG.info("New state: " + newState);
            Assert.assertEquals((Object)origState, (Object)newState);
            Assert.assertTrue((boolean)txManager.commit(tx2));
            Transaction tx4 = txManager.startShort();
            Assert.assertTrue((tx4.getWritePointer() > tx3.getWritePointer() ? 1 : 0) != 0);
            Assert.assertTrue((boolean)tx2.isVisible(tx1.getWritePointer()));
            Assert.assertFalse((boolean)tx2.isVisible(tx3.getWritePointer()));
            Assert.assertFalse((boolean)tx2.isVisible(tx4.getWritePointer()));
            Assert.assertFalse((boolean)txManager.canCommit(tx3, Collections.singleton(b)));
            Transaction tx5 = txManager.startShort();
            Assert.assertTrue((boolean)tx5.isVisible(tx1.getWritePointer()));
            Assert.assertTrue((boolean)tx5.isVisible(tx2.getWritePointer()));
            Assert.assertFalse((boolean)tx5.isVisible(tx3.getWritePointer()));
            Assert.assertFalse((boolean)tx5.isVisible(tx4.getWritePointer()));
            txManager.abort(tx3);
            txManager.abort(tx4);
            txManager.abort(tx5);
            Transaction tx6 = txManager.startShort();
            Assert.assertFalse((boolean)tx6.hasExcludes());
            txManager.abort(tx6);
            Transaction tx = txManager.startShort();
            for (int i = 1; i < 50; ++i) {
                tx = txManager.startShort();
            }
            origState = txManager.getCurrentState();
            Thread.sleep(100L);
            storage3 = this.getStorage(conf);
            txManager = new TransactionManager(conf, (TransactionStateStorage)storage3, new TxMetricsCollector());
            txManager.startAndWait();
            newState = txManager.getCurrentState();
            Assert.assertEquals((Object)origState, (Object)newState);
            Transaction txAfter = txManager.startShort();
            Assert.assertTrue((txAfter.getWritePointer() > tx.getWritePointer() ? 1 : 0) != 0);
        }
        finally {
            if (storage != null) {
                storage.stopAndWait();
            }
            if (storage2 != null) {
                storage2.stopAndWait();
            }
            if (storage3 != null) {
                storage3.stopAndWait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCommittedSetClearing() throws Exception {
        Configuration conf = this.getConfiguration("testCommittedSetClearing");
        conf.setInt("data.tx.cleanup.interval", 0);
        conf.setInt("data.tx.snapshot.interval", 0);
        AbstractTransactionStateStorage storage1 = null;
        AbstractTransactionStateStorage storage2 = null;
        try {
            storage1 = this.getStorage(conf);
            TransactionManager txManager = new TransactionManager(conf, (TransactionStateStorage)storage1, new TxMetricsCollector());
            txManager.startAndWait();
            byte[] a = new byte[]{97};
            byte[] b = new byte[]{98};
            Transaction tx1 = txManager.startShort();
            Assert.assertTrue((boolean)txManager.canCommit(tx1, Collections.singleton(a)));
            Assert.assertTrue((boolean)txManager.commit(tx1));
            Transaction tx2 = txManager.startShort();
            Assert.assertTrue((boolean)txManager.canCommit(tx2, Collections.singleton(b)));
            Transaction tx3 = txManager.startShort();
            TransactionSnapshot origState = txManager.getCurrentState();
            LOG.info("Orig state: " + origState);
            storage2 = this.getStorage(conf);
            txManager = new TransactionManager(conf, (TransactionStateStorage)storage2, new TxMetricsCollector());
            txManager.startAndWait();
            TransactionSnapshot newState = txManager.getCurrentState();
            LOG.info("New state: " + newState);
            Assert.assertEquals((Object)origState, (Object)newState);
        }
        finally {
            if (storage1 != null) {
                storage1.stopAndWait();
            }
            if (storage2 != null) {
                storage2.stopAndWait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOldFileRemoval() throws Exception {
        Configuration conf = this.getConfiguration("testOldFileRemoval");
        AbstractTransactionStateStorage storage = null;
        try {
            storage = this.getStorage(conf);
            storage.startAndWait();
            long now = System.currentTimeMillis();
            long writePointer = 1L;
            ArrayList invalid = Lists.newArrayList();
            TreeMap inprogress = Maps.newTreeMap();
            HashMap committing = Maps.newHashMap();
            HashMap committed = Maps.newHashMap();
            TransactionSnapshot snapshot = new TransactionSnapshot(now, 0L, writePointer++, (Collection)invalid, (NavigableMap)inprogress, (Map)committing, (Map)committed);
            TransactionEdit dummyEdit = TransactionEdit.createStarted((long)1L, (long)0L, (long)Long.MAX_VALUE);
            storage.writeSnapshot(snapshot);
            TransactionLog log = storage.createLog(now);
            log.append(dummyEdit);
            log.close();
            snapshot = new TransactionSnapshot(now + 1L, 0L, writePointer++, (Collection)invalid, (NavigableMap)inprogress, (Map)committing, (Map)committed);
            storage.writeSnapshot(snapshot);
            log = storage.createLog(now + 1L);
            log.append(dummyEdit);
            log.close();
            snapshot = new TransactionSnapshot(now + 2L, 0L, writePointer++, (Collection)invalid, (NavigableMap)inprogress, (Map)committing, (Map)committed);
            storage.writeSnapshot(snapshot);
            log = storage.createLog(now + 2L);
            log.append(dummyEdit);
            log.close();
            snapshot = new TransactionSnapshot(now + 3L, 0L, writePointer++, (Collection)invalid, (NavigableMap)inprogress, (Map)committing, (Map)committed);
            storage.writeSnapshot(snapshot);
            log = storage.createLog(now + 3L);
            log.append(dummyEdit);
            log.close();
            snapshot = new TransactionSnapshot(now + 4L, 0L, writePointer++, (Collection)invalid, (NavigableMap)inprogress, (Map)committing, (Map)committed);
            storage.writeSnapshot(snapshot);
            log = storage.createLog(now + 4L);
            log.append(dummyEdit);
            log.close();
            snapshot = new TransactionSnapshot(now + 5L, 0L, writePointer++, (Collection)invalid, (NavigableMap)inprogress, (Map)committing, (Map)committed);
            storage.writeSnapshot(snapshot);
            log = storage.createLog(now + 5L);
            log.append(dummyEdit);
            log.close();
            List allSnapshots = storage.listSnapshots();
            LOG.info("All snapshots: " + allSnapshots);
            Assert.assertEquals((long)6L, (long)allSnapshots.size());
            List allLogs = storage.listLogs();
            LOG.info("All logs: " + allLogs);
            Assert.assertEquals((long)6L, (long)allLogs.size());
            long oldestKept = storage.deleteOldSnapshots(3);
            Assert.assertEquals((long)(now + 3L), (long)oldestKept);
            allSnapshots = storage.listSnapshots();
            LOG.info("All snapshots: " + allSnapshots);
            Assert.assertEquals((long)3L, (long)allSnapshots.size());
            storage.deleteLogsOlderThan(oldestKept);
            allLogs = storage.listLogs();
            LOG.info("All logs: " + allLogs);
            Assert.assertEquals((long)3L, (long)allLogs.size());
        }
        finally {
            if (storage != null) {
                storage.stopAndWait();
            }
        }
    }

    private TransactionSnapshot createRandomSnapshot() {
        long readPointer = Math.abs(random.nextLong()) % 1000000L + 1000000L;
        long writePointer = readPointer + 1000L;
        TreeMap inProgress = Maps.newTreeMap();
        long startPointer = writePointer - 500L;
        for (int i = 0; i < 500; ++i) {
            long currentTime = System.currentTimeMillis();
            if (i % 20 == 0) {
                inProgress.put(startPointer + (long)i, new TransactionManager.InProgressTx(startPointer - 1L, -currentTime));
                continue;
            }
            inProgress.put(startPointer + (long)i, new TransactionManager.InProgressTx(startPointer - 1L, currentTime + 300000L));
        }
        LongArrayList invalid = new LongArrayList();
        for (int i = 0; i < 100; ++i) {
            invalid.add(Math.abs(random.nextLong()) % 1000000L);
        }
        HashMap committing = Maps.newHashMap();
        for (int i = 0; i < 100; ++i) {
            committing.put(readPointer + (long)i, this.generateChangeSet(10));
        }
        long startCommitted = readPointer - 1000L;
        TreeMap committed = Maps.newTreeMap();
        for (int i = 0; i < 1000; ++i) {
            committed.put(startCommitted + (long)i, this.generateChangeSet(10));
        }
        return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer, (Collection)invalid, (NavigableMap)inProgress, (Map)committing, (Map)committed);
    }

    private Set<ChangeId> generateChangeSet(int numEntries) {
        HashSet changes = Sets.newHashSet();
        for (int i = 0; i < numEntries; ++i) {
            byte[] bytes = new byte[8];
            random.nextBytes(bytes);
            changes.add(new ChangeId(bytes));
        }
        return changes;
    }

    private List<TransactionEdit> createRandomEdits(int numEntries) {
        ArrayList edits = Lists.newArrayListWithCapacity((int)numEntries);
        block8: for (int i = 0; i < numEntries; ++i) {
            TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)];
            long writePointer = Math.abs(random.nextLong());
            switch (nextType) {
                case INPROGRESS: {
                    edits.add(TransactionEdit.createStarted((long)writePointer, (long)(writePointer - 1L), (long)(System.currentTimeMillis() + 300000L)));
                    continue block8;
                }
                case COMMITTING: {
                    edits.add(TransactionEdit.createCommitting((long)writePointer, this.generateChangeSet(10)));
                    continue block8;
                }
                case COMMITTED: {
                    edits.add(TransactionEdit.createCommitted((long)writePointer, this.generateChangeSet(10), (long)(writePointer + 1L), (boolean)random.nextBoolean()));
                    continue block8;
                }
                case INVALID: {
                    edits.add(TransactionEdit.createInvalid((long)writePointer));
                    continue block8;
                }
                case ABORTED: {
                    edits.add(TransactionEdit.createAborted((long)writePointer));
                    continue block8;
                }
                case MOVE_WATERMARK: {
                    edits.add(TransactionEdit.createMoveWatermark((long)writePointer));
                }
            }
        }
        return edits;
    }
}

