/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.flume.storage;

import com.datatorrent.api.Component;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.Slice;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import java.io.DataInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.flume.sink.Server;
import org.apache.apex.malhar.flume.storage.Storage;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HDFSStorage
implements Storage,
Configurable,
Component<com.datatorrent.api.Context> {
    public static final int DEFAULT_BLOCK_SIZE = 0x4000000;
    public static final String BASE_DIR_KEY = "baseDir";
    public static final String RESTORE_KEY = "restore";
    public static final String BLOCKSIZE = "blockSize";
    public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple";
    public static final String NUMBER_RETRY = "retryCount";
    private static final String OFFSET_SUFFIX = "-offsetFile";
    private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile";
    private static final String FLUSHED_IDENTITY_FILE = "flushedCounter";
    private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile";
    private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp";
    private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp";
    private static final int IDENTIFIER_SIZE = 8;
    private static final int DATA_LENGTH_BYTE_SIZE = 4;
    private int retryCount = 3;
    private int blockSizeMultiple = 1;
    @NotNull
    private String id;
    @NotNull
    private String baseDir;
    private long blockSize;
    private boolean restore = true;
    private long currentWrittenFile;
    private long flushedFileCounter;
    private Path flushedCounterFile;
    private Path flushedCounterFileTemp;
    private long cleanedFileCounter;
    private Path cleanFileOffsetFile;
    private Path cleanFileOffsetFileTemp;
    private FileSystem fs;
    private FSDataOutputStream dataStream;
    ArrayList<DataBlock> files2Commit = new ArrayList();
    private long fileWriteOffset;
    private FSDataInputStream readStream;
    private long retrievalOffset;
    private long retrievalFile;
    private int offset;
    private long flushedLong;
    private long flushedFileWriteOffset;
    private long bookKeepingFileOffset;
    private byte[] cleanedOffset = new byte[8];
    private long skipOffset;
    private long skipFile;
    private transient Path basePath;
    private ExecutorService storageExecutor;
    private byte[] currentData;
    private FSDataInputStream nextReadStream;
    private long nextFlushedLong;
    private long nextRetrievalFile;
    private byte[] nextRetrievalData;
    private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class);

    public void configure(Context ctx) {
        String tempBaseDir;
        String tempId = ctx.getString("id");
        if (tempId == null) {
            if (this.id == null) {
                throw new IllegalArgumentException("id can't be  null.");
            }
        } else {
            this.id = tempId;
        }
        if ((tempBaseDir = ctx.getString(BASE_DIR_KEY)) != null) {
            this.baseDir = tempBaseDir;
        }
        this.restore = ctx.getBoolean(RESTORE_KEY, Boolean.valueOf(this.restore));
        Long tempBlockSize = ctx.getLong(BLOCKSIZE);
        if (tempBlockSize != null) {
            this.blockSize = tempBlockSize;
        }
        this.blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, Integer.valueOf(this.blockSizeMultiple));
        this.retryCount = ctx.getInteger(NUMBER_RETRY, Integer.valueOf(this.retryCount));
    }

    byte[] readData(Path path) throws IOException {
        DataInputStream is = new DataInputStream((InputStream)this.fs.open(path));
        byte[] bytes = new byte[is.available()];
        is.readFully(bytes);
        is.close();
        return bytes;
    }

    private FSDataOutputStream writeData(Path path, byte[] data) throws IOException {
        FSDataOutputStream fsOutputStream = this.fs.getScheme().equals("file") ? new FSDataOutputStream((OutputStream)new FileOutputStream(Path.getPathWithoutSchemeAndAuthority((Path)path).toString()), null) : this.fs.create(path);
        fsOutputStream.write(data);
        return fsOutputStream;
    }

    private long calculateOffset(long fileOffset, long fileCounter) {
        return fileCounter << 32 | fileOffset & 0xFFFFFFFFL;
    }

    @Override
    public byte[] store(Slice slice) {
        int bytesToWrite = slice.length + 4;
        if (this.currentWrittenFile < this.skipFile) {
            this.fileWriteOffset += (long)bytesToWrite;
            if (this.fileWriteOffset >= this.bookKeepingFileOffset) {
                this.files2Commit.add(new DataBlock(null, this.bookKeepingFileOffset, new Path(this.basePath, this.currentWrittenFile + OFFSET_SUFFIX), this.currentWrittenFile));
                ++this.currentWrittenFile;
                this.fileWriteOffset = this.fileWriteOffset > this.bookKeepingFileOffset ? (long)bytesToWrite : 0L;
                try {
                    this.bookKeepingFileOffset = this.getFlushedFileWriteOffset(new Path(this.basePath, this.currentWrittenFile + BOOK_KEEPING_FILE_OFFSET));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return null;
        }
        if (this.flushedFileCounter == this.currentWrittenFile && this.dataStream == null) {
            ++this.currentWrittenFile;
            this.fileWriteOffset = 0L;
        }
        if (this.flushedFileCounter == this.skipFile && this.skipFile != -1L) {
            ++this.skipFile;
        }
        if (this.fileWriteOffset + (long)bytesToWrite < this.blockSize) {
            try {
                if (this.fileWriteOffset == 0L) {
                    this.dataStream = this.writeData(new Path(this.basePath, String.valueOf(this.currentWrittenFile)), Ints.toByteArray((int)slice.length));
                    this.dataStream.write(slice.buffer, slice.offset, slice.length);
                } else {
                    this.dataStream.write(Ints.toByteArray((int)slice.length));
                    this.dataStream.write(slice.buffer, slice.offset, slice.length);
                }
                this.fileWriteOffset += (long)bytesToWrite;
                byte[] fileOffset = null;
                if (this.currentWrittenFile > this.skipFile || this.currentWrittenFile == this.skipFile && this.fileWriteOffset > this.skipOffset) {
                    this.skipFile = -1L;
                    fileOffset = new byte[8];
                    Server.writeLong(fileOffset, 0, this.calculateOffset(this.fileWriteOffset, this.currentWrittenFile));
                }
                return fileOffset;
            }
            catch (IOException ex) {
                logger.warn("Error while storing the bytes {}", (Object)ex.getMessage());
                this.closeFs();
                throw new RuntimeException(ex);
            }
        }
        DataBlock db = new DataBlock(this.dataStream, this.fileWriteOffset, new Path(this.basePath, this.currentWrittenFile + OFFSET_SUFFIX), this.currentWrittenFile);
        db.close();
        this.files2Commit.add(db);
        this.fileWriteOffset = 0L;
        ++this.currentWrittenFile;
        return this.store(slice);
    }

    long byteArrayToLong(byte[] b, int startIndex) {
        boolean b1 = false;
        return Longs.fromBytes((byte)0, (byte)0, (byte)0, (byte)0, (byte)b[3 + startIndex], (byte)b[2 + startIndex], (byte)b[1 + startIndex], (byte)b[startIndex]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public byte[] retrieve(byte[] identifier) {
        long cleanedFile;
        this.skipFile = -1L;
        this.skipOffset = 0L;
        logger.debug("retrieve with address {}", (Object)Arrays.toString(identifier));
        this.closeUnflushedFiles();
        this.retrievalOffset = this.byteArrayToLong(identifier, 0);
        this.retrievalFile = this.byteArrayToLong(identifier, this.offset);
        if (this.retrievalFile == 0L && this.retrievalOffset == 0L && this.currentWrittenFile == 0L && this.fileWriteOffset == 0L) {
            this.skipOffset = 0L;
            return null;
        }
        if ((this.retrievalFile != 0L || this.retrievalOffset != 0L) && (this.retrievalFile < (cleanedFile = this.byteArrayToLong(this.cleanedOffset, this.offset)) || this.retrievalFile == cleanedFile && this.retrievalOffset < this.byteArrayToLong(this.cleanedOffset, 0))) {
            logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, cleanedOffset={}", new Object[]{this.retrievalFile, cleanedFile, this.retrievalOffset, this.byteArrayToLong(this.cleanedOffset, 0)});
            this.closeFs();
            throw new IllegalArgumentException(String.format("The data for address %s has already been deleted", Arrays.toString(identifier)));
        }
        if (this.retrievalFile == 0L && this.retrievalOffset == 0L) {
            this.retrievalFile = this.byteArrayToLong(this.cleanedOffset, this.offset);
            this.retrievalOffset = this.byteArrayToLong(this.cleanedOffset, 0);
        }
        if (this.retrievalFile > this.flushedFileCounter) {
            this.skipFile = this.retrievalFile;
            this.skipOffset = this.retrievalOffset;
            this.retrievalFile = -1L;
            return null;
        }
        if (this.retrievalFile == this.flushedFileCounter && this.retrievalOffset >= this.flushedFileWriteOffset) {
            this.skipFile = this.retrievalFile;
            this.skipOffset = this.retrievalOffset - this.flushedFileWriteOffset;
            this.retrievalFile = -1L;
            return null;
        }
        try {
            Path path;
            if (this.readStream != null) {
                this.readStream.close();
                this.readStream = null;
            }
            if (!this.fs.exists(path = new Path(this.basePath, String.valueOf(this.retrievalFile)))) {
                this.retrievalFile = -1L;
                this.closeFs();
                throw new RuntimeException(String.format("File %s does not exist", path.toString()));
            }
            byte[] flushedOffset = this.readData(new Path(this.basePath, this.retrievalFile + OFFSET_SUFFIX));
            this.flushedLong = Server.readLong(flushedOffset, 0);
            while (this.retrievalOffset >= this.flushedLong && this.retrievalFile < this.flushedFileCounter) {
                this.retrievalOffset -= this.flushedLong;
                ++this.retrievalFile;
                flushedOffset = this.readData(new Path(this.basePath, this.retrievalFile + OFFSET_SUFFIX));
                this.flushedLong = Server.readLong(flushedOffset, 0);
            }
            if (this.retrievalOffset >= this.flushedLong) {
                logger.warn("data not flushed for the given identifier");
                this.retrievalFile = -1L;
                return null;
            }
            HDFSStorage hDFSStorage = this;
            synchronized (hDFSStorage) {
                if (this.nextReadStream != null) {
                    this.nextReadStream.close();
                    this.nextReadStream = null;
                }
            }
            this.currentData = null;
            path = new Path(this.basePath, String.valueOf(this.retrievalFile));
            this.currentData = this.readData(path);
            this.storageExecutor.submit(this.getNextStream());
            return this.retrieveHelper();
        }
        catch (IOException e) {
            this.closeFs();
            throw new RuntimeException(e);
        }
    }

    private byte[] retrieveHelper() throws IOException {
        int tempRetrievalOffset = (int)this.retrievalOffset;
        int length = Ints.fromBytes((byte)this.currentData[tempRetrievalOffset], (byte)this.currentData[tempRetrievalOffset + 1], (byte)this.currentData[tempRetrievalOffset + 2], (byte)this.currentData[tempRetrievalOffset + 3]);
        byte[] data = new byte[length + 8];
        System.arraycopy(this.currentData, tempRetrievalOffset + 4, data, 8, length);
        this.retrievalOffset += (long)(length + 4);
        if (this.retrievalOffset >= this.flushedLong) {
            Server.writeLong(data, 0, this.calculateOffset(0L, this.retrievalFile + 1L));
        } else {
            Server.writeLong(data, 0, this.calculateOffset(this.retrievalOffset, this.retrievalFile));
        }
        return data;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public byte[] retrieveNext() {
        if (this.retrievalFile == -1L) {
            this.closeFs();
            throw new RuntimeException("Call retrieve first");
        }
        if (this.retrievalFile > this.flushedFileCounter) {
            logger.warn("data is not flushed");
            return null;
        }
        try {
            byte[] flushedOffset;
            HDFSStorage hDFSStorage;
            if (this.currentData == null) {
                hDFSStorage = this;
                synchronized (hDFSStorage) {
                    if (this.nextRetrievalData != null && this.retrievalFile == this.nextRetrievalFile) {
                        this.currentData = this.nextRetrievalData;
                        this.flushedLong = this.nextFlushedLong;
                        this.nextRetrievalData = null;
                    } else {
                        this.currentData = null;
                        this.currentData = this.readData(new Path(this.basePath, String.valueOf(this.retrievalFile)));
                        flushedOffset = this.readData(new Path(this.basePath, this.retrievalFile + OFFSET_SUFFIX));
                        this.flushedLong = Server.readLong(flushedOffset, 0);
                    }
                }
                this.storageExecutor.submit(this.getNextStream());
            }
            if (this.retrievalOffset >= this.flushedLong) {
                ++this.retrievalFile;
                this.retrievalOffset = 0L;
                if (this.retrievalFile > this.flushedFileCounter) {
                    logger.warn("data is not flushed");
                    return null;
                }
                hDFSStorage = this;
                synchronized (hDFSStorage) {
                    if (this.nextRetrievalData != null && this.retrievalFile == this.nextRetrievalFile) {
                        this.currentData = this.nextRetrievalData;
                        this.flushedLong = this.nextFlushedLong;
                        this.nextRetrievalData = null;
                    } else {
                        this.currentData = null;
                        this.currentData = this.readData(new Path(this.basePath, String.valueOf(this.retrievalFile)));
                        flushedOffset = this.readData(new Path(this.basePath, this.retrievalFile + OFFSET_SUFFIX));
                        this.flushedLong = Server.readLong(flushedOffset, 0);
                    }
                }
                this.storageExecutor.submit(this.getNextStream());
            }
            return this.retrieveHelper();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void clean(byte[] identifier) {
        logger.info("clean {}", (Object)Arrays.toString(identifier));
        long cleanFileIndex = this.byteArrayToLong(identifier, this.offset);
        long cleanFileOffset = this.byteArrayToLong(identifier, 0);
        if (this.flushedFileCounter == -1L) {
            identifier = new byte[8];
        } else if (cleanFileIndex > this.flushedFileCounter || cleanFileIndex == this.flushedFileCounter && cleanFileOffset >= this.flushedFileWriteOffset) {
            cleanFileIndex = this.flushedFileCounter;
            cleanFileOffset = this.flushedFileWriteOffset;
            Server.writeLong(identifier, 0, this.calculateOffset(cleanFileOffset, cleanFileIndex));
        }
        this.cleanedOffset = identifier;
        try {
            this.writeData(this.cleanFileOffsetFileTemp, identifier).close();
            this.fs.rename(this.cleanFileOffsetFileTemp, this.cleanFileOffsetFile);
            if (this.cleanedFileCounter >= cleanFileIndex) {
                return;
            }
            do {
                Path path;
                if (this.fs.exists(path = new Path(this.basePath, String.valueOf(this.cleanedFileCounter))) && this.fs.isFile(path)) {
                    this.fs.delete(path, false);
                }
                if (this.fs.exists(path = new Path(this.basePath, this.cleanedFileCounter + OFFSET_SUFFIX)) && this.fs.isFile(path)) {
                    this.fs.delete(path, false);
                }
                if (this.fs.exists(path = new Path(this.basePath, this.cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET)) && this.fs.isFile(path)) {
                    this.fs.delete(path, false);
                }
                logger.info("deleted file {}", (Object)this.cleanedFileCounter);
                ++this.cleanedFileCounter;
            } while (this.cleanedFileCounter < cleanFileIndex);
        }
        catch (IOException e) {
            logger.warn("not able to close the streams {}", (Object)e.getMessage());
            this.closeFs();
            throw new RuntimeException(e);
        }
    }

    void cleanHelperFiles() {
        try {
            this.fs.delete(this.basePath, true);
        }
        catch (IOException e) {
            logger.warn(e.getMessage());
        }
    }

    private void closeUnflushedFiles() {
        try {
            this.files2Commit.clear();
            if (this.dataStream != null) {
                this.dataStream.close();
                this.dataStream = null;
            }
            if (!this.fs.exists(new Path(this.basePath, this.currentWrittenFile + OFFSET_SUFFIX))) {
                this.fs.delete(new Path(this.basePath, String.valueOf(this.currentWrittenFile)), false);
            }
            if (this.fs.exists(new Path(this.basePath, this.flushedFileCounter + OFFSET_SUFFIX))) {
                this.flushedFileWriteOffset = this.getFlushedFileWriteOffset(new Path(this.basePath, this.flushedFileCounter + OFFSET_SUFFIX));
                this.bookKeepingFileOffset = this.getFlushedFileWriteOffset(new Path(this.basePath, this.flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
            }
            if (this.flushedFileCounter != -1L) {
                this.currentWrittenFile = this.flushedFileCounter;
                this.fileWriteOffset = this.flushedFileWriteOffset;
            } else {
                this.currentWrittenFile = 0L;
                this.fileWriteOffset = 0L;
            }
            this.flushedLong = 0L;
        }
        catch (IOException e) {
            this.closeFs();
            throw new RuntimeException(e);
        }
    }

    @Override
    public void flush() {
        this.nextReadStream = null;
        StringBuilder builder = new StringBuilder();
        Iterator<DataBlock> itr = this.files2Commit.iterator();
        try {
            while (itr.hasNext()) {
                DataBlock db = itr.next();
                db.updateOffsets();
                builder.append(db.fileName).append(", ");
            }
            this.files2Commit.clear();
            if (this.dataStream != null) {
                this.dataStream.hflush();
                this.writeData(this.flushedCounterFileTemp, String.valueOf(this.currentWrittenFile).getBytes()).close();
                this.fs.rename(this.flushedCounterFileTemp, this.flushedCounterFile);
                this.updateFlushedOffset(new Path(this.basePath, this.currentWrittenFile + OFFSET_SUFFIX), this.fileWriteOffset);
                this.flushedFileWriteOffset = this.fileWriteOffset;
                builder.append(this.currentWrittenFile);
            }
            logger.debug("flushed files {}", (Object)builder.toString());
        }
        catch (IOException ex) {
            logger.warn("not able to close the stream {}", (Object)ex.getMessage());
            this.closeFs();
            throw new RuntimeException(ex);
        }
        this.flushedFileCounter = this.currentWrittenFile;
    }

    private void updateFlushedOffset(Path file, long bytesWritten) {
        byte[] lastStoredOffset = new byte[8];
        Server.writeLong(lastStoredOffset, 0, bytesWritten);
        try {
            this.writeData(file, lastStoredOffset).close();
        }
        catch (IOException e) {
            try {
                if (!Arrays.equals(this.readData(file), lastStoredOffset)) {
                    this.closeFs();
                    throw new RuntimeException(e);
                }
            }
            catch (Exception e1) {
                this.closeFs();
                throw new RuntimeException(e1);
            }
        }
    }

    public int getBlockSizeMultiple() {
        return this.blockSizeMultiple;
    }

    public void setBlockSizeMultiple(int blockSizeMultiple) {
        this.blockSizeMultiple = blockSizeMultiple;
    }

    public String getBaseDir() {
        return this.baseDir;
    }

    public void setBaseDir(String baseDir) {
        this.baseDir = baseDir;
    }

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getBlockSize() {
        return this.blockSize;
    }

    public void setBlockSize(long blockSize) {
        this.blockSize = blockSize;
    }

    public boolean isRestore() {
        return this.restore;
    }

    public void setRestore(boolean restore) {
        this.restore = restore;
    }

    public void setup(com.datatorrent.api.Context context) {
        Configuration conf = new Configuration();
        if (this.baseDir == null) {
            this.baseDir = conf.get("hadoop.tmp.dir");
            if (this.baseDir == null || this.baseDir.isEmpty()) {
                throw new IllegalArgumentException("baseDir cannot be null.");
            }
        }
        this.offset = 4;
        this.skipOffset = -1L;
        this.skipFile = -1L;
        int tempRetryCount = 0;
        while (tempRetryCount < this.retryCount && this.fs == null) {
            try {
                this.fs = FileSystem.newInstance((Configuration)conf);
                ++tempRetryCount;
            }
            catch (Throwable throwable) {
                logger.warn("Not able to get file system ", throwable);
            }
        }
        try {
            Path path = new Path(this.baseDir);
            this.basePath = new Path(path, this.id);
            if (this.fs == null) {
                this.fs = FileSystem.newInstance((Configuration)conf);
            }
            if (!this.fs.exists(path)) {
                this.closeFs();
                throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", this.baseDir));
            }
            if (!this.fs.isDirectory(path)) {
                this.closeFs();
                throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", this.baseDir));
            }
            if (!this.restore) {
                this.fs.delete(this.basePath, true);
            }
            if (!this.fs.exists(this.basePath) || !this.fs.isDirectory(this.basePath)) {
                this.fs.mkdirs(this.basePath);
            }
            if (this.blockSize == 0L) {
                this.blockSize = this.fs.getDefaultBlockSize(new Path(this.basePath, "tempData"));
            }
            if (this.blockSize == 0L) {
                this.blockSize = 0x4000000L;
            }
            this.blockSize = (long)this.blockSizeMultiple * this.blockSize;
            this.currentWrittenFile = 0L;
            this.cleanedFileCounter = -1L;
            this.retrievalFile = -1L;
            this.flushedFileCounter = -1L;
            this.cleanFileOffsetFile = new Path(this.basePath, CLEAN_OFFSET_FILE);
            this.cleanFileOffsetFileTemp = new Path(this.basePath, CLEAN_OFFSET_FILE_TEMP);
            this.flushedCounterFile = new Path(this.basePath, FLUSHED_IDENTITY_FILE);
            this.flushedCounterFileTemp = new Path(this.basePath, FLUSHED_IDENTITY_FILE_TEMP);
            if (this.restore) {
                if (this.fs.exists(this.cleanFileOffsetFile) && this.fs.isFile(this.cleanFileOffsetFile)) {
                    this.cleanedOffset = this.readData(this.cleanFileOffsetFile);
                }
                if (this.fs.exists(this.flushedCounterFile) && this.fs.isFile(this.flushedCounterFile)) {
                    String strFlushedFileCounter = new String(this.readData(this.flushedCounterFile));
                    if (strFlushedFileCounter.isEmpty()) {
                        logger.warn("empty flushed file");
                    } else {
                        this.flushedFileCounter = Long.valueOf(strFlushedFileCounter);
                        this.flushedFileWriteOffset = this.getFlushedFileWriteOffset(new Path(this.basePath, this.flushedFileCounter + OFFSET_SUFFIX));
                        this.bookKeepingFileOffset = this.getFlushedFileWriteOffset(new Path(this.basePath, this.flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
                    }
                }
            }
            this.fileWriteOffset = this.flushedFileWriteOffset;
            this.currentWrittenFile = this.flushedFileCounter;
            this.cleanedFileCounter = this.byteArrayToLong(this.cleanedOffset, this.offset) - 1L;
            if (this.currentWrittenFile == -1L) {
                ++this.currentWrittenFile;
                this.fileWriteOffset = 0L;
            }
        }
        catch (IOException io) {
            throw new RuntimeException(io);
        }
        this.storageExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new NameableThreadFactory("StorageHelper"));
    }

    private void closeFs() {
        if (this.fs != null) {
            try {
                this.fs.close();
                this.fs = null;
            }
            catch (IOException e) {
                logger.debug(e.getMessage());
            }
        }
    }

    private long getFlushedFileWriteOffset(Path filePath) throws IOException {
        byte[] flushedFileOffsetByte;
        if (this.flushedFileCounter != -1L && this.fs.exists(filePath) && (flushedFileOffsetByte = this.readData(filePath)) != null && flushedFileOffsetByte.length == 8) {
            return Server.readLong(flushedFileOffsetByte, 0);
        }
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void teardown() {
        logger.debug("called teardown");
        try {
            if (this.readStream != null) {
                this.readStream.close();
            }
            HDFSStorage hDFSStorage = this;
            synchronized (hDFSStorage) {
                if (this.nextReadStream != null) {
                    this.nextReadStream.close();
                }
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        finally {
            this.closeUnflushedFiles();
            this.storageExecutor.shutdown();
        }
    }

    private Runnable getNextStream() {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    HDFSStorage hDFSStorage = HDFSStorage.this;
                    synchronized (hDFSStorage) {
                        HDFSStorage.this.nextRetrievalFile = HDFSStorage.this.retrievalFile + 1L;
                        if (HDFSStorage.this.nextRetrievalFile > HDFSStorage.this.flushedFileCounter) {
                            HDFSStorage.access$802(HDFSStorage.this, null);
                            return;
                        }
                        Path path = new Path(HDFSStorage.this.basePath, String.valueOf(HDFSStorage.this.nextRetrievalFile));
                        Path offsetPath = new Path(HDFSStorage.this.basePath, HDFSStorage.this.nextRetrievalFile + HDFSStorage.OFFSET_SUFFIX);
                        HDFSStorage.access$802(HDFSStorage.this, null);
                        HDFSStorage.access$802(HDFSStorage.this, HDFSStorage.this.readData(path));
                        byte[] flushedOffset = HDFSStorage.this.readData(offsetPath);
                        HDFSStorage.this.nextFlushedLong = Server.readLong(flushedOffset, 0);
                    }
                }
                catch (Throwable e) {
                    logger.warn("in storage executor ", e);
                }
            }
        };
    }

    static /* synthetic */ byte[] access$802(HDFSStorage x0, byte[] x1) {
        x0.nextRetrievalData = x1;
        return x1;
    }

    class DataBlock {
        FSDataOutputStream dataStream;
        long dataOffset;
        Path path2FlushedData;
        long fileName;
        private Path bookKeepingPath;

        DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName) {
            this.dataStream = stream;
            this.dataOffset = bytesWritten;
            this.path2FlushedData = path2FlushedData;
            this.fileName = fileName;
        }

        public void close() {
            if (this.dataStream != null) {
                try {
                    this.dataStream.close();
                    this.bookKeepingPath = new Path(HDFSStorage.this.basePath, this.fileName + HDFSStorage.BOOK_KEEPING_FILE_OFFSET);
                    HDFSStorage.this.updateFlushedOffset(this.bookKeepingPath, this.dataOffset);
                }
                catch (IOException ex) {
                    logger.warn("not able to close the stream {}", (Object)ex.getMessage());
                    HDFSStorage.this.closeFs();
                    throw new RuntimeException(ex);
                }
            }
        }

        public void updateOffsets() throws IOException {
            HDFSStorage.this.updateFlushedOffset(this.path2FlushedData, this.dataOffset);
            if (this.bookKeepingPath != null && HDFSStorage.this.fs.exists(this.bookKeepingPath)) {
                HDFSStorage.this.fs.delete(this.bookKeepingPath, false);
            }
        }
    }
}

