/*
 * Decompiled with CFR 0.152.
 */
package com.mware.ge.accumulo.util;

import com.mware.ge.GeException;
import com.mware.ge.Graph;
import com.mware.ge.GraphConfiguration;
import com.mware.ge.Property;
import com.mware.ge.accumulo.AccumuloGraph;
import com.mware.ge.accumulo.AccumuloGraphConfiguration;
import com.mware.ge.accumulo.StreamingPropertyValueHdfsRef;
import com.mware.ge.accumulo.StreamingPropertyValueTable;
import com.mware.ge.accumulo.StreamingPropertyValueTableRef;
import com.mware.ge.accumulo.keys.DataTableRowKey;
import com.mware.ge.accumulo.mapreduce.ElementMapperGraph;
import com.mware.ge.accumulo.util.GeTabletServerBatchReader;
import com.mware.ge.accumulo.util.HdfsLargeDataStore;
import com.mware.ge.accumulo.util.LimitOutputStream;
import com.mware.ge.accumulo.util.RangeUtils;
import com.mware.ge.store.mutations.ElementMutationBuilder;
import com.mware.ge.store.mutations.StoreMutation;
import com.mware.ge.store.util.StreamingPropertyValueStorageStrategy;
import com.mware.ge.util.GeLogger;
import com.mware.ge.util.GeLoggerFactory;
import com.mware.ge.util.IOUtils;
import com.mware.ge.values.storable.StreamingPropertyValue;
import com.mware.ge.values.storable.StreamingPropertyValueRef;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.hadoop.fs.FileSystem;

public class OverflowIntoHdfsStreamingPropertyValueStorageStrategy
implements StreamingPropertyValueStorageStrategy {
    private static final GeLogger LOGGER = GeLoggerFactory.getLogger(OverflowIntoHdfsStreamingPropertyValueStorageStrategy.class);
    private static String EMPTY_TEXT = "";
    private final FileSystem fileSystem;
    private final long maxStreamingPropertyValueTableDataSize;
    private final String dataDir;
    private final Graph graph;
    private final AccumuloGraphConfiguration config;

    public OverflowIntoHdfsStreamingPropertyValueStorageStrategy(Graph graph, GraphConfiguration configuration) throws Exception {
        if (!(configuration instanceof AccumuloGraphConfiguration)) {
            throw new GeException("Expected " + AccumuloGraphConfiguration.class.getName() + " found " + configuration.getClass().getName());
        }
        if (!(graph instanceof AccumuloGraph) && !(graph instanceof ElementMapperGraph)) {
            throw new GeException("Expected " + AccumuloGraph.class.getName() + " or " + ElementMapperGraph.class.getName() + ", found " + graph.getClass().getName());
        }
        this.graph = graph;
        this.config = (AccumuloGraphConfiguration)configuration;
        this.fileSystem = this.config.createFileSystem();
        this.maxStreamingPropertyValueTableDataSize = this.config.getMaxStreamingPropertyValueTableDataSize();
        this.dataDir = this.config.getDataDir();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamingPropertyValueRef saveStreamingPropertyValue(ElementMutationBuilder elementMutationBuilder, String rowKey, Property property, StreamingPropertyValue streamingPropertyValue) {
        try {
            HdfsLargeDataStore largeDataStore = new HdfsLargeDataStore(this.fileSystem, this.dataDir, rowKey, property);
            try (LimitOutputStream out = new LimitOutputStream(largeDataStore, this.maxStreamingPropertyValueTableDataSize);){
                InputStream is = streamingPropertyValue.getInputStream();
                if (is.markSupported()) {
                    is.reset();
                }
                IOUtils.copy((InputStream)streamingPropertyValue.getInputStream(), (OutputStream)out);
            }
            if (out.hasExceededSizeLimit()) {
                LOGGER.debug("saved large file to \"%s\" (length: %d)", new Object[]{largeDataStore.getFullHdfsPath(), out.getLength()});
                return new StreamingPropertyValueHdfsRef(largeDataStore.getRelativeFileName(), streamingPropertyValue);
            }
            return this.saveStreamingPropertyValueSmall(elementMutationBuilder, rowKey, property, out.getSmall(), streamingPropertyValue);
        }
        catch (IOException ex) {
            throw new GeException((Throwable)ex);
        }
    }

    public void close() {
        try {
            this.fileSystem.close();
        }
        catch (IOException ex) {
            throw new GeException("Could not close filesystem", (Throwable)ex);
        }
    }

    public List<InputStream> getInputStreams(List<StreamingPropertyValue> streamingPropertyValues) {
        List<StreamingPropertyValueTable> notLoadedTableSpvs = streamingPropertyValues.stream().filter(spv -> spv instanceof StreamingPropertyValueTable).map(spv -> (StreamingPropertyValueTable)((Object)spv)).filter(spv -> !spv.isDataLoaded()).collect(Collectors.toList());
        List<String> dataRowKeys = notLoadedTableSpvs.stream().map(StreamingPropertyValueTable::getDataRowKey).collect(Collectors.toList());
        Map<String, byte[]> tableInputStreams = this.streamingPropertyValueTableDatas(dataRowKeys);
        notLoadedTableSpvs.forEach(spv -> {
            String dataRowKey = spv.getDataRowKey();
            byte[] bytes = (byte[])tableInputStreams.get(dataRowKey);
            if (bytes == null) {
                throw new GeException("Could not find StreamingPropertyValue data: " + dataRowKey);
            }
            spv.setData(bytes);
        });
        return streamingPropertyValues.stream().map(StreamingPropertyValue::getInputStream).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<String, byte[]> streamingPropertyValueTableDatas(List<String> dataRowKeys) {
        Object object;
        if (dataRowKeys.size() == 0) {
            return Collections.emptyMap();
        }
        List<Range> ranges = dataRowKeys.stream().map(RangeUtils::createRangeFromString).collect(Collectors.toList());
        long timerStartTime = System.currentTimeMillis();
        String dataTableName = AccumuloGraph.getDataTableName((String)this.config.getTableNamePrefix());
        GeTabletServerBatchReader scanner = this.createBatchScanner(dataTableName, ranges, new Authorizations());
        AccumuloGraph.GRAPH_LOGGER.logStartIterator(dataTableName, (ScannerBase)scanner);
        Span trace = Trace.start((String)"streamingPropertyValueTableData");
        trace.data("dataRowKeyCount", Integer.toString(dataRowKeys.size()));
        try {
            HashMap<String, byte[]> results = new HashMap<String, byte[]>();
            object = scanner.iterator();
            while (object.hasNext()) {
                Map.Entry col = (Map.Entry)object.next();
                results.put(((Key)col.getKey()).getRow().toString(), ((Value)col.getValue()).get());
            }
            object = results;
        }
        catch (Throwable throwable) {
            try {
                scanner.close();
                trace.stop();
                AccumuloGraph.GRAPH_LOGGER.logEndIterator(System.currentTimeMillis() - timerStartTime);
                throw throwable;
            }
            catch (Exception ex) {
                throw new GeException((Throwable)ex);
            }
        }
        scanner.close();
        trace.stop();
        AccumuloGraph.GRAPH_LOGGER.logEndIterator(System.currentTimeMillis() - timerStartTime);
        return object;
    }

    private StreamingPropertyValueRef saveStreamingPropertyValueSmall(ElementMutationBuilder elementMutationBuilder, String rowKey, Property property, byte[] data, StreamingPropertyValue propertyValue) {
        String dataTableRowKey = new DataTableRowKey(rowKey, property).getRowKey();
        StoreMutation dataMutation = new StoreMutation(dataTableRowKey);
        if (property.getTimestamp() != null) {
            dataMutation.put((CharSequence)EMPTY_TEXT, (CharSequence)EMPTY_TEXT, property.getTimestamp().longValue(), data);
        } else {
            dataMutation.put((CharSequence)EMPTY_TEXT, (CharSequence)EMPTY_TEXT, data);
        }
        elementMutationBuilder.saveDataMutation(dataMutation);
        return new StreamingPropertyValueTableRef(dataTableRowKey, propertyValue, data);
    }

    public FileSystem getFileSystem() {
        return this.fileSystem;
    }

    public String getDataDir() {
        return this.dataDir;
    }

    public GeTabletServerBatchReader createBatchScanner(String tableName, Collection<Range> ranges, Authorizations accumuloAuthorizations) throws TableNotFoundException {
        Connector connector = this.config.createConnector();
        GeTabletServerBatchReader scanner = new GeTabletServerBatchReader(connector, tableName, accumuloAuthorizations, this.config.getNumberOfQueryThreads());
        scanner.setRanges(ranges);
        return scanner;
    }
}

