/*
 * Decompiled with CFR 0.152.
 */
package com.mware.ge.accumulo;

import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import com.mware.ge.GeException;
import com.mware.ge.accumulo.AccumuloGraph;
import com.mware.ge.accumulo.util.RangeUtils;
import com.mware.ge.util.ByteRingBuffer;
import com.mware.ge.values.storable.StreamingPropertyValue;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.TimestampFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.hadoop.io.Text;

public class StreamingPropertyValueTableData
extends StreamingPropertyValue {
    private static final long serialVersionUID = 1897402273830254711L;
    public static final String METADATA_COLUMN_FAMILY = "a";
    public static final String DATA_COLUMN_FAMILY = "d";
    public static final String METADATA_LENGTH_COLUMN_QUALIFIER = "length";
    private final AccumuloGraph graph;
    private final String dataRowKey;
    private Long length;
    private final Long timestamp;

    public StreamingPropertyValueTableData(AccumuloGraph graph, String dataRowKey, Class valueType, Long length, Long timestamp) {
        super(valueType);
        this.graph = graph;
        this.dataRowKey = dataRowKey;
        this.length = length;
        this.timestamp = timestamp;
    }

    public Long getLength() {
        return this.length;
    }

    public InputStream getInputStream() {
        return new DataTableInputStream();
    }

    private class DataTableInputStream
    extends InputStream {
        private final ByteRingBuffer buffer = new ByteRingBuffer(0x100000);
        private long timerStartTime;
        private Span trace;
        private ScannerBase scanner;
        private Iterator<Map.Entry<Key, Value>> scannerIterator;
        private long previousLoadedDataLength;
        private long loadedDataLength;
        private boolean closed;
        private long markRowIndex = 0L;
        private long markByteOffsetInRow = 0L;
        private long markLoadedDataLength = 0L;
        private long currentDataRowIndex = -1L;
        private long currentByteOffsetInRow;

        private DataTableInputStream() {
        }

        @Override
        public int read(byte[] dest, int off, int len) throws IOException {
            if (len == 0) {
                return 0;
            }
            len = Math.min(len, this.buffer.getSize());
            while (this.buffer.getUsed() == 0 && this.loadMoreData()) {
            }
            if (this.buffer.getUsed() == 0) {
                return -1;
            }
            int bytesRead = this.buffer.read(dest, off, len);
            this.currentByteOffsetInRow += (long)bytesRead;
            return bytesRead;
        }

        @Override
        public int read() throws IOException {
            if (this.buffer.getUsed() < 1) {
                this.loadMoreData();
                if (this.buffer.getUsed() == 0) {
                    return -1;
                }
            }
            ++this.currentByteOffsetInRow;
            return this.buffer.read();
        }

        @Override
        public void close() throws IOException {
            if (this.closed) {
                return;
            }
            this.scannerIterator = null;
            if (this.scanner != null) {
                this.scanner.close();
                this.scanner = null;
            }
            if (this.trace != null) {
                this.trace.stop();
                this.trace = null;
            }
            StreamingPropertyValueTableData.this.graph.getGraphLogger().logEndIterator(System.currentTimeMillis() - this.timerStartTime);
            super.close();
            this.closed = true;
        }

        private boolean loadMoreData() throws IOException {
            Map.Entry<Key, Value> column;
            block5: {
                if (this.closed) {
                    return false;
                }
                Iterator<Map.Entry<Key, Value>> it = this.getScannerIterator();
                while (true) {
                    if (!it.hasNext()) {
                        this.close();
                        return false;
                    }
                    column = it.next();
                    if (!column.getKey().getColumnFamily().equals((Object)new Text(StreamingPropertyValueTableData.METADATA_COLUMN_FAMILY))) break block5;
                    if (!column.getKey().getColumnQualifier().equals((Object)new Text(StreamingPropertyValueTableData.METADATA_LENGTH_COLUMN_QUALIFIER))) break;
                    StreamingPropertyValueTableData.this.length = Longs.fromByteArray((byte[])column.getValue().get());
                }
                throw new GeException("unexpected metadata column qualifier: " + column.getKey().getColumnQualifier() + " (row: " + column.getKey().getRow() + ")");
            }
            if (column.getKey().getColumnFamily().equals((Object)new Text(StreamingPropertyValueTableData.DATA_COLUMN_FAMILY))) {
                ++this.currentDataRowIndex;
                this.currentByteOffsetInRow = 0L;
                byte[] data = column.getValue().get();
                if (StreamingPropertyValueTableData.this.length == null) {
                    throw new GeException("unexpected missing length (row: " + column.getKey().getRow() + ")");
                }
                long len = Math.min((long)data.length, StreamingPropertyValueTableData.this.length - this.loadedDataLength);
                this.buffer.write(data, 0, (int)len);
                this.previousLoadedDataLength = this.loadedDataLength;
                this.loadedDataLength += len;
                return true;
            }
            throw new GeException("unexpected column family: " + column.getKey().getColumnFamily() + " (row: " + column.getKey().getRow() + ")");
        }

        private Iterator<Map.Entry<Key, Value>> getScannerIterator() throws IOException {
            if (this.closed) {
                throw new IOException("stream already closed");
            }
            if (this.scannerIterator != null) {
                return this.scannerIterator;
            }
            this.scannerIterator = this.getScanner().iterator();
            return this.scannerIterator;
        }

        private ScannerBase getScanner() throws IOException {
            if (this.closed) {
                throw new IOException("stream already closed");
            }
            if (this.scanner != null) {
                return this.scanner;
            }
            ArrayList ranges = Lists.newArrayList((Object[])new Range[]{RangeUtils.createRangeFromString(StreamingPropertyValueTableData.this.dataRowKey)});
            this.timerStartTime = System.currentTimeMillis();
            try {
                this.scanner = StreamingPropertyValueTableData.this.graph.createBatchScanner(StreamingPropertyValueTableData.this.graph.getDataTableName(), (Collection<Range>)ranges, new Authorizations());
            }
            catch (TableNotFoundException ex) {
                throw new GeException("Could not create scanner", (Throwable)ex);
            }
            if (StreamingPropertyValueTableData.this.timestamp != null && StreamingPropertyValueTableData.this.graph.getConfiguration().getMaxVersions() > 1) {
                IteratorSetting iteratorSetting = new IteratorSetting(80, TimestampFilter.class.getSimpleName(), TimestampFilter.class);
                TimestampFilter.setStart((IteratorSetting)iteratorSetting, (long)StreamingPropertyValueTableData.this.timestamp, (boolean)true);
                TimestampFilter.setEnd((IteratorSetting)iteratorSetting, (long)StreamingPropertyValueTableData.this.timestamp, (boolean)true);
                this.scanner.addScanIterator(iteratorSetting);
            }
            StreamingPropertyValueTableData.this.graph.getGraphLogger().logStartIterator(StreamingPropertyValueTableData.this.graph.getDataTableName(), this.scanner);
            this.trace = Trace.start((String)"streamingPropertyValueTableData");
            this.trace.data("dataRowKeyCount", Integer.toString(1));
            return this.scanner;
        }

        @Override
        public synchronized void mark(int readlimit) {
            this.markRowIndex = Math.max(0L, this.currentDataRowIndex);
            this.markByteOffsetInRow = this.currentByteOffsetInRow;
            this.markLoadedDataLength = this.previousLoadedDataLength;
        }

        @Override
        public synchronized void reset() throws IOException {
            Map.Entry<Key, Value> column;
            this.buffer.clear();
            if (this.scannerIterator != null) {
                this.scannerIterator = null;
            }
            this.closed = false;
            this.currentDataRowIndex = -1L;
            this.currentByteOffsetInRow = 0L;
            this.loadedDataLength = this.markLoadedDataLength;
            Iterator<Map.Entry<Key, Value>> it = this.getScannerIterator();
            while (true) {
                if (!it.hasNext()) {
                    this.close();
                    return;
                }
                column = it.next();
                if (!column.getKey().getColumnFamily().equals((Object)StreamingPropertyValueTableData.DATA_COLUMN_FAMILY)) continue;
                ++this.currentDataRowIndex;
                this.currentByteOffsetInRow = 0L;
                if (this.currentDataRowIndex == this.markRowIndex) break;
            }
            byte[] data = column.getValue().get();
            long len = Math.min((long)data.length, StreamingPropertyValueTableData.this.length - this.loadedDataLength);
            this.buffer.write(data, 0, (int)len);
            this.loadedDataLength += len;
            while (this.currentByteOffsetInRow != this.markByteOffsetInRow) {
                this.buffer.read();
                ++this.currentByteOffsetInRow;
            }
        }

        @Override
        public boolean markSupported() {
            return true;
        }
    }
}

