Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.ooc.cache.BlockKey;
import org.apache.sysds.runtime.ooc.cache.OOCIOHandler;
import org.apache.sysds.runtime.ooc.cache.OOCCacheManager;
import org.apache.sysds.runtime.ooc.stream.OOCSourceStream;
import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList;

import java.util.HashMap;
Expand Down Expand Up @@ -89,10 +91,22 @@ public CachingStream(OOCStream<IndexedMatrixValue> source, long streamId) {
if(task != LocalTaskQueue.NO_MORE_TASKS) {
if (!_cacheInProgress)
throw new DMLRuntimeException("Stream is closed");
if (mSubscribers == null || mSubscribers.length == 0)
OOCCacheManager.put(_streamId, _numBlocks, task);
else
mCallback = OOCCacheManager.putAndPin(_streamId, _numBlocks, task);
OOCIOHandler.SourceBlockDescriptor descriptor = null;
if (_source instanceof OOCSourceStream src) {
descriptor = src.getDescriptor(task.getIndexes());
}
if (descriptor == null) {
if (mSubscribers == null || mSubscribers.length == 0)
OOCCacheManager.put(_streamId, _numBlocks, task);
else
mCallback = OOCCacheManager.putAndPin(_streamId, _numBlocks, task);
}
else {
if (mSubscribers == null || mSubscribers.length == 0)
OOCCacheManager.putSourceBacked(_streamId, _numBlocks, task, descriptor);
else
mCallback = OOCCacheManager.putAndPinSourceBacked(_streamId, _numBlocks, task, descriptor);
}
if (_index != null)
_index.put(task.getIndexes(), _numBlocks);
blk = _numBlocks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,19 @@

package org.apache.sysds.runtime.instructions.ooc;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.common.Opcodes;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.io.MatrixReader;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.ooc.cache.OOCCacheManager;
import org.apache.sysds.runtime.ooc.cache.OOCIOHandler;
import org.apache.sysds.runtime.ooc.stream.OOCSourceStream;

public class ReblockOOCInstruction extends ComputationOOCInstruction {
private int blen;
Expand Down Expand Up @@ -74,40 +69,19 @@ public void processInstruction(ExecutionContext ec) {
//TODO support other formats than binary

//create queue, spawn thread for asynchronous reading, and return
OOCStream<IndexedMatrixValue> q = createWritableStream();
submitOOCTask(() -> readBinaryBlock(q, min.getFileName()), q);
OOCStream<IndexedMatrixValue> q = new OOCSourceStream();
OOCIOHandler io = OOCCacheManager.getIOHandler();
OOCIOHandler.SourceReadRequest req = new OOCIOHandler.SourceReadRequest(
min.getFileName(), Types.FileFormat.BINARY, mc.getRows(), mc.getCols(), blen, mc.getNonZeros(),
Long.MAX_VALUE, true, q);
io.scheduleSourceRead(req).whenComplete((res, err) -> {
if (err != null) {
Exception ex = err instanceof Exception ? (Exception) err : new Exception(err);
q.propagateFailure(new DMLRuntimeException(ex));
}
});

MatrixObject mout = ec.getMatrixObject(output);
mout.setStreamHandle(q);
}

@SuppressWarnings("resource")
private void readBinaryBlock(OOCStream<IndexedMatrixValue> q, String fname) {
try {
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fname );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);

//check existence and non-empty file
MatrixReader.checkValidInputFile(fs, path);

//core reading
for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) { //1..N files
//directly read from sequence files (individual partfiles)
try( SequenceFile.Reader reader = new SequenceFile
.Reader(job, SequenceFile.Reader.file(lpath)) )
{
MatrixIndexes key = new MatrixIndexes();
MatrixBlock value = new MatrixBlock();
while( reader.next(key, value) )
q.enqueue(new IndexedMatrixValue(key, new MatrixBlock(value)));
}
}
q.closeInput();
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ public static OOCCacheScheduler getCache() {
}
}

public static OOCIOHandler getIOHandler() {
OOCIOHandler io = _ioHandler.get();
if(io != null)
return io;
// Ensure initialization happens
getCache();
return _ioHandler.get();
}

/**
* Removes a block from the cache without setting its data to null.
*/
Expand All @@ -116,11 +125,28 @@ public static void put(long streamId, int blockId, IndexedMatrixValue value) {
getCache().put(key, value, ((MatrixBlock)value.getValue()).getExactSerializedSize());
}

/**
* Store a source-backed block in the OOC cache and register its source location.
*/
public static void putSourceBacked(long streamId, int blockId, IndexedMatrixValue value,
OOCIOHandler.SourceBlockDescriptor descriptor) {
BlockKey key = new BlockKey(streamId, blockId);
getCache().putSourceBacked(key, value, ((MatrixBlock) value.getValue()).getExactSerializedSize(), descriptor);
}

public static OOCStream.QueueCallback<IndexedMatrixValue> putAndPin(long streamId, int blockId, IndexedMatrixValue value) {
BlockKey key = new BlockKey(streamId, blockId);
return new CachedQueueCallback<>(getCache().putAndPin(key, value, ((MatrixBlock)value.getValue()).getExactSerializedSize()), null);
}

public static OOCStream.QueueCallback<IndexedMatrixValue> putAndPinSourceBacked(long streamId, int blockId,
IndexedMatrixValue value, OOCIOHandler.SourceBlockDescriptor descriptor) {
BlockKey key = new BlockKey(streamId, blockId);
return new CachedQueueCallback<>(
getCache().putAndPinSourceBacked(key, value, ((MatrixBlock) value.getValue()).getExactSerializedSize(),
descriptor), null);
}

public static CompletableFuture<OOCStream.QueueCallback<IndexedMatrixValue>> requestBlock(long streamId, long blockId) {
BlockKey key = new BlockKey(streamId, blockId);
return getCache().request(key).thenApply(e -> new CachedQueueCallback<>(e, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,28 @@ public interface OOCCacheScheduler {
*/
BlockEntry putAndPin(BlockKey key, Object data, long size);

/**
* Places a new source-backed block in the cache and registers the location with the IO handler. The entry is
* treated as backed by disk, so eviction does not schedule spill writes.
*
* @param key the associated key of the block
* @param data the block data
* @param size the size of the data
* @param descriptor the source location descriptor
*/
void putSourceBacked(BlockKey key, Object data, long size, OOCIOHandler.SourceBlockDescriptor descriptor);

/**
* Places a new source-backed block in the cache and returns a pinned handle.
*
* @param key the associated key of the block
* @param data the block data
* @param size the size of the data
* @param descriptor the source location descriptor
*/
BlockEntry putAndPinSourceBacked(BlockKey key, Object data, long size,
OOCIOHandler.SourceBlockDescriptor descriptor);

/**
* Forgets a block from the cache.
* @param key the associated key of the block
Expand Down
82 changes: 82 additions & 0 deletions src/main/java/org/apache/sysds/runtime/ooc/cache/OOCIOHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.sysds.runtime.ooc.cache;

import java.util.concurrent.CompletableFuture;
import java.util.List;

public interface OOCIOHandler {
void shutdown();
Expand All @@ -29,4 +30,85 @@ public interface OOCIOHandler {
CompletableFuture<BlockEntry> scheduleRead(BlockEntry block);

CompletableFuture<Boolean> scheduleDeletion(BlockEntry block);

/**
* Registers the source location of a block for future direct reads.
*/
void registerSourceLocation(BlockKey key, SourceBlockDescriptor descriptor);

/**
* Schedule an asynchronous read from an external source into the provided target stream.
* The returned future completes when either EOF is reached or the requested byte budget
* is exhausted. When the budget is reached and keepOpenOnLimit is true, the target stream
* is kept open and a continuation token is provided so the caller can resume.
*/
CompletableFuture<SourceReadResult> scheduleSourceRead(SourceReadRequest request);

/**
* Continue a previously throttled source read using the provided continuation token.
*/
CompletableFuture<SourceReadResult> continueSourceRead(SourceReadContinuation continuation, long maxBytesInFlight);

interface SourceReadContinuation {}

class SourceReadRequest {
public final String path;
public final org.apache.sysds.common.Types.FileFormat format;
public final long rows;
public final long cols;
public final int blen;
public final long estNnz;
public final long maxBytesInFlight;
public final boolean keepOpenOnLimit;
public final org.apache.sysds.runtime.instructions.ooc.OOCStream<org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue> target;

public SourceReadRequest(String path, org.apache.sysds.common.Types.FileFormat format, long rows, long cols,
int blen, long estNnz, long maxBytesInFlight, boolean keepOpenOnLimit,
org.apache.sysds.runtime.instructions.ooc.OOCStream<org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue> target) {
this.path = path;
this.format = format;
this.rows = rows;
this.cols = cols;
this.blen = blen;
this.estNnz = estNnz;
this.maxBytesInFlight = maxBytesInFlight;
this.keepOpenOnLimit = keepOpenOnLimit;
this.target = target;
}
}

class SourceReadResult {
public final long bytesRead;
public final boolean eof;
public final SourceReadContinuation continuation;
public final List<SourceBlockDescriptor> blocks;

public SourceReadResult(long bytesRead, boolean eof, SourceReadContinuation continuation,
List<SourceBlockDescriptor> blocks) {
this.bytesRead = bytesRead;
this.eof = eof;
this.continuation = continuation;
this.blocks = blocks;
}
}

class SourceBlockDescriptor {
public final String path;
public final org.apache.sysds.common.Types.FileFormat format;
public final org.apache.sysds.runtime.matrix.data.MatrixIndexes indexes;
public final long offset;
public final int recordLength;
public final long serializedSize;

public SourceBlockDescriptor(String path, org.apache.sysds.common.Types.FileFormat format,
org.apache.sysds.runtime.matrix.data.MatrixIndexes indexes, long offset, int recordLength,
long serializedSize) {
this.path = path;
this.format = format;
this.indexes = indexes;
this.offset = offset;
this.recordLength = recordLength;
this.serializedSize = serializedSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,36 @@ private void scheduleDeferredRead(DeferredReadRequest deferredReadRequest) {

@Override
public void put(BlockKey key, Object data, long size) {
put(key, data, size, false);
put(key, data, size, false, null);
}

@Override
public BlockEntry putAndPin(BlockKey key, Object data, long size) {
return put(key, data, size, true);
return put(key, data, size, true, null);
}

private BlockEntry put(BlockKey key, Object data, long size, boolean pin) {
@Override
public void putSourceBacked(BlockKey key, Object data, long size, OOCIOHandler.SourceBlockDescriptor descriptor) {
put(key, data, size, false, descriptor);
}

@Override
public BlockEntry putAndPinSourceBacked(BlockKey key, Object data, long size, OOCIOHandler.SourceBlockDescriptor descriptor) {
return put(key, data, size, true, descriptor);
}

private BlockEntry put(BlockKey key, Object data, long size, boolean pin, OOCIOHandler.SourceBlockDescriptor descriptor) {
if (!this._running)
throw new IllegalStateException();
if (data == null)
throw new IllegalArgumentException();
if (descriptor != null)
_ioHandler.registerSourceLocation(key, descriptor);

Statistics.incrementOOCEvictionPut();
BlockEntry entry = new BlockEntry(key, size, data);
if (descriptor != null)
entry.setState(BlockState.WARM);
if (pin)
entry.pin();
synchronized(this) {
Expand Down Expand Up @@ -301,15 +315,15 @@ private void onCacheSizeChanged(boolean incr) {
}

private synchronized void sanityCheck() {
if (_cacheSize > _hardLimit) {
if (_cacheSize > _hardLimit * 1.1) {
if (!_warnThrottling) {
_warnThrottling = true;
System.out.println("[INFO] Throttling: " + _cacheSize/1000 + "KB - " + _bytesUpForEviction/1000 + "KB > " + _hardLimit/1000 + "KB");
System.out.println("[WARN] Cache hard limit exceeded by over 10%: " + String.format("%.2f", _cacheSize/1000000.0) + "MB (-" + String.format("%.2f", _bytesUpForEviction/1000000.0) + "MB) > " + String.format("%.2f", _hardLimit/1000000.0) + "MB");
}
}
else if (_warnThrottling) {
else if (_warnThrottling && _cacheSize < _hardLimit) {
_warnThrottling = false;
System.out.println("[INFO] No more throttling: " + _cacheSize/1000 + "KB - " + _bytesUpForEviction/1000 + "KB <= " + _hardLimit/1000 + "KB");
System.out.println("[INFO] Cache within limit: " + String.format("%.2f", _cacheSize/1000000.0) + "MB (-" + String.format("%.2f", _bytesUpForEviction/1000000.0) + "MB) <= " + String.format("%.2f", _hardLimit/1000000.0) + "MB");
}

if (!SANITY_CHECKS)
Expand Down
Loading
Loading