diff --git a/pom.xml b/pom.xml index f4c1091..ea7e597 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,11 @@ 4.13.1 test + + org.apache.commons + commons-compress + 1.28.0 + diff --git a/src/main/java/dev/zarr/zarrjava/core/Group.java b/src/main/java/dev/zarr/zarrjava/core/Group.java index d8b9a6b..6f8a4d0 100644 --- a/src/main/java/dev/zarr/zarrjava/core/Group.java +++ b/src/main/java/dev/zarr/zarrjava/core/Group.java @@ -64,7 +64,12 @@ public static Group open(String path) throws IOException, ZarrException { } @Nullable - public abstract Node get(String key) throws ZarrException, IOException; + public abstract Node get(String[] key) throws ZarrException, IOException; + + @Nullable + public Node get(String key) throws ZarrException, IOException { + return get(new String[]{key}); + } public Stream list() { return storeHandle.list() diff --git a/src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java b/src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java new file mode 100644 index 0000000..c0b72a1 --- /dev/null +++ b/src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java @@ -0,0 +1,313 @@ +package dev.zarr.zarrjava.store; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.stream.Stream; + +import org.apache.commons.compress.archivers.zip.*; + +import java.util.zip.CRC32; +import java.util.zip.ZipEntry; // for STORED constant + + +/** A Store implementation that buffers reads and writes and flushes them to an underlying Store as a zip file. + */ +public class BufferedZipStore extends ZipStore { + + private final Store.ListableStore bufferStore; + private String archiveComment; + private final boolean flushOnWrite; + + private final Comparator zipEntryComparator = (a, b) -> { + boolean aIsZarr = a.length > 0 && a[a.length - 1].equals("zarr.json"); + boolean bIsZarr = b.length > 0 && b[b.length - 1].equals("zarr.json"); + // first all zarr.json files + if (aIsZarr && !bIsZarr) { + return -1; + } else if (!aIsZarr && bIsZarr) { + return 1; + } else if (aIsZarr && bIsZarr) { + // sort zarr.json in BFS order within same depth by lexicographical order + if (a.length != b.length) { + return Integer.compare(a.length, b.length); + } else { + return String.join("/", a).compareTo(String.join("/", b)); + } + } else { + // then all other files in lexicographical order + return String.join("/", a).compareTo(String.join("/", b)); + } + }; + + private void writeBuffer() throws IOException { + // create zip file bytes from buffer store and write to underlying store + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(baos)) { + zos.setUseZip64(Zip64Mode.AsNeeded); + if (archiveComment != null) { + zos.setComment(archiveComment); + } + bufferStore.list().sorted(zipEntryComparator).forEach(keys -> { + try { + if (keys == null || keys.length == 0) { + // skip root entry + return; + } + String entryName = String.join("/", keys); + ByteBuffer bb = bufferStore.get(keys); + if (bb == null) { + // directory entry: ensure trailing slash + if (!entryName.endsWith("/")) { + entryName = entryName + "/"; + } + ZipArchiveEntry dirEntry = new ZipArchiveEntry(entryName); + dirEntry.setMethod(ZipEntry.STORED); + dirEntry.setSize(0); + dirEntry.setCrc(0); + zos.putArchiveEntry(dirEntry); + zos.closeArchiveEntry(); + } else { + // read bytes from ByteBuffer without modifying original + ByteBuffer dup = bb.duplicate(); + int len = dup.remaining(); + byte[] bytes = new byte[len]; + dup.get(bytes); + + // compute CRC and set size for STORED (no compression) + CRC32 crc = new CRC32(); + crc.update(bytes, 0, bytes.length); + ZipArchiveEntry fileEntry = new ZipArchiveEntry(entryName); + fileEntry.setMethod(ZipEntry.STORED); + fileEntry.setSize(bytes.length); + fileEntry.setCrc(crc.getValue()); + + zos.putArchiveEntry(fileEntry); + zos.write(bytes); + zos.closeArchiveEntry(); + } + } catch (IOException e) { + // wrap checked exception so it can be rethrown from stream for handling below + throw new RuntimeException(e); + } + }); + zos.finish(); + } catch (RuntimeException e) { + // unwrap and rethrow IOExceptions thrown inside the lambda + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw e; + } + + byte[] zipBytes = baos.toByteArray(); + // write zip bytes back to underlying store + underlyingStore.set(ByteBuffer.wrap(zipBytes)); + } + + public void setArchiveComment(@Nullable String archiveComment) throws IOException { + this.archiveComment = archiveComment; + if (flushOnWrite) { + writeBuffer(); + } + } + + public void deleteArchiveComment() throws IOException { + this.setArchiveComment(null); + } + + /** + * Loads the buffer from the underlying store zip file. + */ + private void loadBuffer() throws IOException { + String loadedArchiveComment = super.getArchiveComment(); + if (loadedArchiveComment != null && this.archiveComment == null) { + // don't overwrite existing archiveComment + this.archiveComment = loadedArchiveComment; + } + + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + return; + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + if (entry.isDirectory()) { + continue; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] tmp = new byte[8192]; + int read; + while ((read = zis.read(tmp)) != -1) { + baos.write(tmp, 0, read); + } + byte[] bytes = baos.toByteArray(); + bufferStore.set(new String[]{entry.getName()}, ByteBuffer.wrap(bytes)); + } + } + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, @Nullable String archiveComment, boolean flushOnWrite) { + super(underlyingStore); + this.bufferStore = bufferStore; + this.archiveComment = archiveComment; + this.flushOnWrite = flushOnWrite; + try { + loadBuffer(); + } catch (IOException e) { + throw new RuntimeException("Failed to load buffer from underlying store", e); + } + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, @Nullable String archiveComment) { + this(underlyingStore, bufferStore, archiveComment, false); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore) { + this(underlyingStore, bufferStore, null); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, String archiveComment) { + this(underlyingStore, new MemoryStore(), archiveComment); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore) { + this(underlyingStore, (String) null); + } + + public BufferedZipStore(@Nonnull Path underlyingStore, String archiveComment) { + this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString()), archiveComment); + } + + public BufferedZipStore(@Nonnull Path underlyingStore) { + this(underlyingStore, null); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath, String archiveComment) { + this(Paths.get(underlyingStorePath), archiveComment); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath) { + this(underlyingStorePath, null); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, boolean flushOnWrite) { + this(underlyingStore, bufferStore, null, flushOnWrite); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, String archiveComment, boolean flushOnWrite) { + this(underlyingStore, new MemoryStore(), archiveComment, flushOnWrite); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, boolean flushOnWrite) { + this(underlyingStore, (String) null, flushOnWrite); + } + + public BufferedZipStore(@Nonnull Path underlyingStore, String archiveComment, boolean flushOnWrite) { + this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString()), archiveComment, flushOnWrite); + } + + public BufferedZipStore(@Nonnull Path underlyingStore, boolean flushOnWrite) { + this(underlyingStore, null, flushOnWrite); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath, String archiveComment, boolean flushOnWrite) { + this(Paths.get(underlyingStorePath), archiveComment, flushOnWrite); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath, boolean flushOnWrite) { + this(underlyingStorePath, null, flushOnWrite); + } + + + /** + * Flushes the buffer and archiveComment to the underlying store as a zip file. + */ + public void flush() throws IOException { + writeBuffer(); + } + + @Override + public String getArchiveComment() { + return archiveComment; + } + + @Override + public Stream list(String[] keys) { + return bufferStore.list(keys); + } + + @Override + public boolean exists(String[] keys) { + return bufferStore.exists(keys); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys) { + return bufferStore.get(keys); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start) { + return bufferStore.get(keys, start); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start, long end) { + return bufferStore.get(keys, start, end); + } + + @Override + public void set(String[] keys, ByteBuffer bytes) { + bufferStore.set(keys, bytes); + if (flushOnWrite) { + try { + writeBuffer(); + } catch (IOException e) { + throw new RuntimeException("Failed to flush buffer to underlying store after set operation", e); + } + } + } + + @Override + public void delete(String[] keys) { + bufferStore.delete(keys); + if (flushOnWrite) { + try { + writeBuffer(); + } catch (IOException e) { + throw new RuntimeException("Failed to flush buffer to underlying store after delete operation", e); + } + } + } + + @Nonnull + @Override + public StoreHandle resolve(String... keys) { + return new StoreHandle(this, keys); + } + + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + return bufferStore.getInputStream(keys, start, end); + } + + public long getSize(String[] keys) { + return bufferStore.getSize(keys); + } + + @Override + public String toString() { + return "BufferedZipStore(" + underlyingStore.toString() + ")"; + } +} diff --git a/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java b/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java index f0b01cc..d8d992d 100644 --- a/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java +++ b/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java @@ -1,7 +1,10 @@ package dev.zarr.zarrjava.store; import dev.zarr.zarrjava.utils.Utils; +import org.apache.commons.io.input.BoundedInputStream; + import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; @@ -120,10 +123,16 @@ public void delete(String[] keys) { throw new RuntimeException(e); } } - - public Stream list(String[] keys) { + public Stream list(String[] keys) { try { - return Files.list(resolveKeys(keys)).map(p -> p.toFile().getName()); + return Files.list(resolveKeys(keys)).map(path -> { + Path relativePath = resolveKeys(keys).relativize(path); + String[] parts = new String[relativePath.getNameCount()]; + for (int i = 0; i < relativePath.getNameCount(); i++) { + parts[i] = relativePath.getName(i).toString(); + } + return parts; + }); } catch (IOException e) { throw new RuntimeException(e); } @@ -140,4 +149,36 @@ public String toString() { return this.path.toUri().toString().replaceAll("\\/$", ""); } + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + Path keyPath = resolveKeys(keys); + try { + if (!Files.exists(keyPath)) { + return null; + } + InputStream inputStream = Files.newInputStream(keyPath); + if (start > 0) { + long skipped = inputStream.skip(start); + if (skipped < start) { + throw new IOException("Unable to skip to the desired start position."); + } + } + if (end != -1) { + long bytesToRead = end - start; + return new BoundedInputStream(inputStream, bytesToRead); + } else { + return inputStream; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public long getSize(String[] keys) { + try { + return Files.size(resolveKeys(keys)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/dev/zarr/zarrjava/store/HttpStore.java b/src/main/java/dev/zarr/zarrjava/store/HttpStore.java index 343d251..8dcd75b 100644 --- a/src/main/java/dev/zarr/zarrjava/store/HttpStore.java +++ b/src/main/java/dev/zarr/zarrjava/store/HttpStore.java @@ -5,7 +5,10 @@ import com.squareup.okhttp.Request; import com.squareup.okhttp.Response; import com.squareup.okhttp.ResponseBody; + +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -101,6 +104,60 @@ public StoreHandle resolve(String... keys) { @Override public String toString() { - return uri; + return uri; } + + @Override + @Nullable + public InputStream getInputStream(String[] keys, long start, long end) { + if (start < 0) { + throw new IllegalArgumentException("Argument 'start' needs to be non-negative."); + } + Request request = new Request.Builder().url(resolveKeys(keys)).header( + "Range", String.format("Bytes=%d-%d", start, end - 1)).build(); + Call call = httpClient.newCall(request); + try { + Response response = call.execute(); + ResponseBody body = response.body(); + if (body == null) return null; + InputStream stream = body.byteStream(); + + // Ensure closing the stream also closes the response + return new FilterInputStream(stream) { + @Override + public void close() throws IOException { + super.close(); + body.close(); + } + }; + } catch (IOException e) { + return null; + } + } + @Override + public long getSize(String[] keys) { + // Explicitly request "identity" encoding to prevent OkHttp from adding "gzip" + // and subsequently stripping the Content-Length header. + Request request = new Request.Builder() + .head() + .url(resolveKeys(keys)) + .header("Accept-Encoding", "identity") + .build(); + + Call call = httpClient.newCall(request); + try { + Response response = call.execute(); + if (!response.isSuccessful()) { + throw new IOException("Failed to get size: " + response.code()); + } + + String contentLength = response.header("Content-Length"); + if (contentLength != null) { + return Long.parseLong(contentLength); + } + return -1; + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java b/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java index c1bbb9d..09ee39b 100644 --- a/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java +++ b/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java @@ -2,6 +2,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -45,7 +46,7 @@ public ByteBuffer get(String[] keys, long start, long end) { if (bytes == null) return null; if (end < 0) end = bytes.length; if (end > Integer.MAX_VALUE) throw new IllegalArgumentException("End index too large"); - return ByteBuffer.wrap(bytes, (int) start, (int) end); + return ByteBuffer.wrap(bytes, (int) start, (int) (end - start)); } @@ -59,19 +60,18 @@ public void delete(String[] keys) { map.remove(resolveKeys(keys)); } - public Stream list(String[] keys) { - List prefix = resolveKeys(keys); - Set allKeys = new HashSet<>(); + public Stream list(String[] keys) { + List prefix = resolveKeys(keys); + Set> allKeys = new HashSet<>(); - for (List k : map.keySet()) { - if (k.size() <= prefix.size() || ! k.subList(0, prefix.size()).equals(prefix)) - continue; - for (int i = 0; i < k.size(); i++) { - List subKey = k.subList(0, i+1); - allKeys.add(String.join("/", subKey)); + for (List k : map.keySet()) { + if (k.size() <= prefix.size() || ! k.subList(0, prefix.size()).equals(prefix)) + continue; + for (int i = prefix.size(); i < k.size(); i++) { + allKeys.add(k.subList(0, i+1)); + } } - } - return allKeys.stream(); + return allKeys.stream().map(k -> k.toArray(new String[0])); } @Nonnull @@ -84,5 +84,22 @@ public StoreHandle resolve(String... keys) { public String toString() { return String.format("", hashCode()); } -} + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + byte[] bytes = map.get(resolveKeys(keys)); + if (bytes == null) return null; + if (end < 0) end = bytes.length; + if (end > Integer.MAX_VALUE) throw new IllegalArgumentException("End index too large"); + return new java.io.ByteArrayInputStream(bytes, (int) start, (int)(end - start)); + } + + @Override + public long getSize(String[] keys) { + byte[] bytes = map.get(resolveKeys(keys)); + if (bytes == null) { + throw new RuntimeException(new java.io.FileNotFoundException("Key not found: " + String.join("/", keys))); + } + return bytes.length; + } +} diff --git a/src/main/java/dev/zarr/zarrjava/store/ReadOnlyZipStore.java b/src/main/java/dev/zarr/zarrjava/store/ReadOnlyZipStore.java new file mode 100644 index 0000000..7fa2bed --- /dev/null +++ b/src/main/java/dev/zarr/zarrjava/store/ReadOnlyZipStore.java @@ -0,0 +1,221 @@ +package dev.zarr.zarrjava.store; + +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; +import org.apache.commons.io.input.BoundedInputStream; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; + + +/** A Store implementation that provides read-only access to a zip archive stored in an underlying Store. + * Compared to BufferedZipStore, this implementation reads directly from the zip archive without parsing + * its contents into a buffer store first making it more efficient for read-only access to large zip archives. + */ +public class ReadOnlyZipStore extends ZipStore { + + String resolveKeys(String[] keys) { + return String.join("/", keys); + } + + String[] resolveEntryKeys(String entryKey) { + return entryKey.split("/"); + } + + @Override + public boolean exists(String[] keys) { + return get(keys, 0, 0) != null; + } + + @Nullable + @Override + public ByteBuffer get(String[] keys) { + return get(keys, 0); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start) { + return get(keys, start, -1); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start, long end) { + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + return null; + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + if (entry.isDirectory() || !entryName.equals(resolveKeys(keys))) { + continue; + } + + long skipResult = zis.skip(start); + if (skipResult != start) { + throw new IOException("Failed to skip to start position " + start + " in zip entry " + entryName); + } + + long bytesToRead; + if (end != -1) bytesToRead = end - start; + else bytesToRead = Long.MAX_VALUE; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] bufferArray = new byte[8192]; + int len; + while (bytesToRead > 0 && (len = zis.read(bufferArray, 0, (int) Math.min(bufferArray.length, bytesToRead))) != -1) { + baos.write(bufferArray, 0, len); + bytesToRead -= len; + } + byte[] bytes = baos.toByteArray(); + return ByteBuffer.wrap(bytes); + } + } catch (IOException e) { + return null; + } + return null; + } + + @Override + public void set(String[] keys, ByteBuffer bytes) { + throw new UnsupportedOperationException("ReadOnlyZipStore does not support set operation."); + } + + @Override + public void delete(String[] keys) { + throw new UnsupportedOperationException("ReadOnlyZipStore does not support delete operation."); + } + + @Nonnull + @Override + public StoreHandle resolve(String... keys) { + return new StoreHandle(this, keys); + } + + @Override + public String toString() { + return "ReadOnlyZipStore(" + underlyingStore.toString() + ")"; + } + + public ReadOnlyZipStore(@Nonnull StoreHandle underlyingStore) { + super(underlyingStore); + } + + public ReadOnlyZipStore(@Nonnull Path underlyingStore) { + this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString())); + } + + public ReadOnlyZipStore(@Nonnull String underlyingStorePath) { + this(Paths.get(underlyingStorePath)); + } + + @Override + public Stream list(String[] keys) { + Stream.Builder builder = Stream.builder(); + + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + return builder.build(); + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + String prefix = resolveKeys(keys); + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + + if (!entryName.startsWith(prefix) || entryName.equals(prefix)) { + continue; + } + String[] entryKeys = resolveEntryKeys(entryName.substring(prefix.length())); + builder.add(entryKeys); + } + } catch (IOException ignored) {} + return builder.build(); + } + + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + InputStream baseStream = underlyingStore.getInputStream(); + + try { + ZipArchiveInputStream zis = new ZipArchiveInputStream(baseStream); + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + if (entry.isDirectory() || !entryName.equals(resolveKeys(keys))) { + continue; + } + + long skipResult = zis.skip(start); + if (skipResult != start) { + throw new IOException("Failed to skip to start position " + start + " in zip entry " + entryName); + } + + long bytesToRead; + if (end != -1) bytesToRead = end - start; + else bytesToRead = Long.MAX_VALUE; + + return new BoundedInputStream(zis, bytesToRead); + } + return null; + } catch (IOException ignored) {} + return null; + } + + @Override + public long getSize(String[] keys) { + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + throw new RuntimeException(new IOException("Underlying store input stream is null")); + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + if (entry.isDirectory() || !entryName.equals(resolveKeys(keys))) { + continue; + } + long size = entry.getSize(); + if (size < 0) { + // read the entire entry to determine size + size = 0; + byte[] bufferArray = new byte[8192]; + int len; + while ((len = zis.read(bufferArray)) != -1) { + size += len; + } + } + return size; + } + throw new RuntimeException(new java.io.FileNotFoundException("Key not found: " + resolveKeys(keys))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/dev/zarr/zarrjava/store/S3Store.java b/src/main/java/dev/zarr/zarrjava/store/S3Store.java index 27aef77..d112db0 100644 --- a/src/main/java/dev/zarr/zarrjava/store/S3Store.java +++ b/src/main/java/dev/zarr/zarrjava/store/S3Store.java @@ -71,7 +71,7 @@ public ByteBuffer get(String[] keys, long start) { GetObjectRequest req = GetObjectRequest.builder() .bucket(bucketName) .key(resolveKeys(keys)) - .range(String.valueOf(start)) + .range(String.format("bytes=%d-", start)) .build(); return get(req); } @@ -82,7 +82,7 @@ public ByteBuffer get(String[] keys, long start, long end) { GetObjectRequest req = GetObjectRequest.builder() .bucket(bucketName) .key(resolveKeys(keys)) - .range(start +"-"+ end) + .range(String.format("bytes=%d-%d", start, end-1)) // S3 range is inclusive .build(); return get(req); } @@ -104,7 +104,7 @@ public void delete(String[] keys) { } @Override - public Stream list(String[] keys) { + public Stream list(String[] keys) { final String fullKey = resolveKeys(keys); ListObjectsRequest req = ListObjectsRequest.builder() .bucket(bucketName).prefix(fullKey) @@ -112,7 +112,7 @@ public Stream list(String[] keys) { ListObjectsResponse res = s3client.listObjects(req); return res.contents() .stream() - .map(p -> p.key().substring(fullKey.length() + 1)); + .map(p -> p.key().substring(fullKey.length() + 1).split("/")); } @Nonnull @@ -121,6 +121,30 @@ public StoreHandle resolve(String... keys) { return new StoreHandle(this, keys); } + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + GetObjectRequest req = GetObjectRequest.builder() + .bucket(bucketName) + .key(resolveKeys(keys)) + .range(String.format("bytes=%d-%d", start, end-1)) // S3 range is inclusive + .build(); + ResponseInputStream responseInputStream = s3client.getObject(req); + return responseInputStream; + } + + @Override + public long getSize(String[] keys) { + HeadObjectRequest req = HeadObjectRequest.builder() + .bucket(bucketName) + .key(resolveKeys(keys)) + .build(); + try { + return s3client.headObject(req).contentLength(); + } catch (NoSuchKeyException e) { + throw new RuntimeException(e); + } + } + @Override public String toString() { return "s3://" + bucketName + "/" + prefix; diff --git a/src/main/java/dev/zarr/zarrjava/store/Store.java b/src/main/java/dev/zarr/zarrjava/store/Store.java index c92906d..3923bde 100644 --- a/src/main/java/dev/zarr/zarrjava/store/Store.java +++ b/src/main/java/dev/zarr/zarrjava/store/Store.java @@ -1,5 +1,6 @@ package dev.zarr.zarrjava.store; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.stream.Stream; import javax.annotation.Nonnull; @@ -27,6 +28,27 @@ public interface Store { interface ListableStore extends Store { - Stream list(String[] keys); + /** + * Lists all keys in the store that match the given prefix keys. Keys are represented as arrays of strings, + * where each string is a segment of the key path. + * Keys that are exactly equal to the prefix are not included in the results. + * Keys that do not contain data (i.e. "directories") are included in the results. + * + * @param keys The prefix keys to match. + * @return A stream of key arrays that match the given prefix. Prefixed keys are not included in the results. + */ + Stream list(String[] keys); + + default Stream list() { + return list(new String[]{}); + } } + + InputStream getInputStream(String[] keys, long start, long end); + + default InputStream getInputStream(String[] keys) { + return getInputStream(keys, 0, -1); + } + + long getSize(String[] keys); } diff --git a/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java b/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java index b82424f..e2c9273 100644 --- a/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java +++ b/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java @@ -1,6 +1,8 @@ package dev.zarr.zarrjava.store; import dev.zarr.zarrjava.utils.Utils; + +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -44,6 +46,14 @@ public ByteBuffer read(long start, long end) { return store.get(keys, start, end); } + public InputStream getInputStream(int start, int end) { + return store.getInputStream(keys, start, end); + } + + public InputStream getInputStream() { + return store.getInputStream(keys); + } + public void set(ByteBuffer bytes) { store.set(keys, bytes); } @@ -56,13 +66,17 @@ public boolean exists() { return store.exists(keys); } - public Stream list() { + public Stream list() { if (!(store instanceof Store.ListableStore)) { throw new UnsupportedOperationException("The underlying store does not support listing."); } return ((Store.ListableStore) store).list(keys); } + public long getSize() { + return store.getSize(keys); + } + @Override public String toString() { return store + "/" + String.join("/", keys); diff --git a/src/main/java/dev/zarr/zarrjava/store/ZipStore.java b/src/main/java/dev/zarr/zarrjava/store/ZipStore.java new file mode 100644 index 0000000..5865456 --- /dev/null +++ b/src/main/java/dev/zarr/zarrjava/store/ZipStore.java @@ -0,0 +1,81 @@ +package dev.zarr.zarrjava.store; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; + +public abstract class ZipStore implements Store, Store.ListableStore { + protected final StoreHandle underlyingStore; + + public ZipStore(@Nonnull StoreHandle underlyingStore) { + this.underlyingStore = underlyingStore; + } + + public String getArchiveComment() throws IOException { + // Attempt to read from the end of the file to find the EOCD record. + // We try a small chunk first (1KB) which covers most short comments (or no comment), + // then the maximum possible EOCD size (approx 65KB). + if (!underlyingStore.exists()) { + return null; + } + int[] readSizes = {1024, 65535 + 22}; + + for (int size : readSizes) { + ByteBuffer buffer; + long fileSize = underlyingStore.getSize(); + + if (fileSize < size){ + buffer = underlyingStore.read(); + } + else { + buffer = underlyingStore.read(fileSize - size); + } + + if (buffer == null) { + return null; + } + + byte[] bufArray; + if (buffer.hasArray()) { + bufArray = buffer.array(); + } else { + bufArray = new byte[buffer.remaining()]; + buffer.duplicate().get(bufArray); + } + + String comment = getZipCommentFromBuffer(bufArray); + if (comment != null) { + return comment; + } + } + return null; + } + + // adopted from https://stackoverflow.com/a/9918966 + @Nullable + public static String getZipCommentFromBuffer(byte[] bufArray) throws IOException { + // End of Central Directory (EOCD) record magic number + byte[] EOCD = {0x50, 0x4b, 0x05, 0x06}; + int buffLen = bufArray.length; + // Check the buffer from the end + search: + for (int i = buffLen - EOCD.length - 22; i >= 0; i--) { + for (int k = 0; k < EOCD.length; k++) { + if (bufArray[i + k] != EOCD[k]) { + continue search; + } + } + // End of Central Directory found! + int commentLen = bufArray[i + 20] + bufArray[i + 21] * 256; + int realLen = buffLen - i - 22; + if (commentLen != realLen) { + throw new IOException("ZIP comment size mismatch: " + + "directory says len is " + commentLen + + ", but file ends after " + realLen + " bytes!"); + } + return new String(bufArray, i + 22, commentLen); + } + return null; + } +} \ No newline at end of file diff --git a/src/main/java/dev/zarr/zarrjava/v2/Group.java b/src/main/java/dev/zarr/zarrjava/v2/Group.java index c568294..8551a76 100644 --- a/src/main/java/dev/zarr/zarrjava/v2/Group.java +++ b/src/main/java/dev/zarr/zarrjava/v2/Group.java @@ -169,7 +169,7 @@ public static Group create(String path, Attributes attributes) throws IOExceptio * @throws IOException if there is an error accessing the storage */ @Nullable - public Node get(String key) throws ZarrException, IOException { + public Node get(String[] key) throws ZarrException, IOException { StoreHandle keyHandle = storeHandle.resolve(key); try { return Node.open(keyHandle); diff --git a/src/main/java/dev/zarr/zarrjava/v3/Group.java b/src/main/java/dev/zarr/zarrjava/v3/Group.java index d17eb77..2436051 100644 --- a/src/main/java/dev/zarr/zarrjava/v3/Group.java +++ b/src/main/java/dev/zarr/zarrjava/v3/Group.java @@ -182,7 +182,7 @@ public static Group create(String path) throws IOException, ZarrException { * @throws IOException if there is an error accessing the storage */ @Nullable - public Node get(String key) throws ZarrException, IOException{ + public Node get(String[] key) throws ZarrException, IOException{ StoreHandle keyHandle = storeHandle.resolve(key); try { return Node.open(keyHandle); diff --git a/src/test/java/dev/zarr/zarrjava/Utils.java b/src/test/java/dev/zarr/zarrjava/Utils.java new file mode 100644 index 0000000..da57f0d --- /dev/null +++ b/src/test/java/dev/zarr/zarrjava/Utils.java @@ -0,0 +1,88 @@ +package dev.zarr.zarrjava; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.BufferedOutputStream; +import java.nio.file.Path; +import java.nio.file.Files; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import java.util.zip.ZipInputStream; + +public class Utils { + + static void zipFile(Path sourceDir, Path targetDir) throws IOException { + FileOutputStream fos = new FileOutputStream(targetDir.toFile()); + ZipOutputStream zipOut = new ZipOutputStream(fos); + + File fileToZip = new File(sourceDir.toUri()); + + zipFile(fileToZip, "", zipOut); + zipOut.close(); + fos.close(); + } + + static void zipFile(File fileToZip, String fileName, ZipOutputStream zipOut) throws IOException { + if (fileToZip.isHidden()) { + return; + } + if (fileToZip.isDirectory()) { + if (fileName.endsWith("/")) { + zipOut.putNextEntry(new ZipEntry(fileName)); + zipOut.closeEntry(); + } else { + zipOut.putNextEntry(new ZipEntry(fileName + "/")); + zipOut.closeEntry(); + } + File[] children = fileToZip.listFiles(); + for (File childFile : children) { + zipFile(childFile, fileName + "/" + childFile.getName(), zipOut); + } + return; + } + FileInputStream fis = new FileInputStream(fileToZip); + ZipEntry zipEntry = new ZipEntry(fileName); + zipOut.putNextEntry(zipEntry); + byte[] bytes = new byte[1024]; + int length; + while ((length = fis.read(bytes)) >= 0) { + zipOut.write(bytes, 0, length); + } + fis.close(); + } + + /** + * Unzip sourceZip into targetDir. + * Protects against Zip Slip by ensuring extracted paths remain inside targetDir. + */ + static void unzipFile(Path sourceZip, Path targetDir) throws IOException { + Files.createDirectories(targetDir); + try (FileInputStream fis = new FileInputStream(sourceZip.toFile()); + ZipInputStream zis = new ZipInputStream(fis)) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + Path outPath = targetDir.resolve(entry.getName()).normalize(); + Path targetDirNorm = targetDir.normalize(); + if (!outPath.startsWith(targetDirNorm)) { + throw new IOException("Zip entry is outside of the target dir: " + entry.getName()); + } + if (entry.isDirectory() || entry.getName().endsWith("/")) { + Files.createDirectories(outPath); + } else { + Files.createDirectories(outPath.getParent()); + try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(outPath.toFile()))) { + byte[] buffer = new byte[1024]; + int len; + while ((len = zis.read(buffer)) > 0) { + bos.write(buffer, 0, len); + } + } + } + zis.closeEntry(); + } + } + } + +} diff --git a/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java b/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java index 4a369c9..7b12c75 100644 --- a/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java +++ b/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java @@ -3,19 +3,33 @@ import com.fasterxml.jackson.databind.ObjectMapper; import dev.zarr.zarrjava.core.Attributes; import dev.zarr.zarrjava.store.*; -import dev.zarr.zarrjava.v3.*; +import dev.zarr.zarrjava.core.*; +import org.apache.commons.compress.archivers.zip.*; + +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipFile; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.CsvSource; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.stream.Stream; +import java.nio.file.Path; +import java.util.zip.ZipEntry; + +import static dev.zarr.zarrjava.Utils.unzipFile; +import static dev.zarr.zarrjava.Utils.zipFile; import static dev.zarr.zarrjava.v3.Node.makeObjectMapper; @@ -27,7 +41,7 @@ public void testFileSystemStores() throws IOException, ZarrException { GroupMetadata groupMetadata = objectMapper.readValue( Files.readAllBytes(TESTDATA.resolve("l4_sample").resolve("zarr.json")), - GroupMetadata.class + dev.zarr.zarrjava.v3.GroupMetadata.class ); String groupMetadataString = objectMapper.writeValueAsString(groupMetadata); @@ -36,7 +50,7 @@ public void testFileSystemStores() throws IOException, ZarrException { ArrayMetadata arrayMetadata = objectMapper.readValue(Files.readAllBytes(TESTDATA.resolve( "l4_sample").resolve("color").resolve("1").resolve("zarr.json")), - ArrayMetadata.class); + dev.zarr.zarrjava.v3.ArrayMetadata.class); String arrayMetadataString = objectMapper.writeValueAsString(arrayMetadata); Assertions.assertTrue(arrayMetadataString.contains("\"zarr_format\":3")); @@ -74,6 +88,81 @@ public void testS3Store() throws IOException, ZarrException { Assertions.assertEquals(0, arrayCore.read(new long[]{0,0,0,0}, new int[]{1,1,1,1}).getInt(0)); } + @Test + public void testS3StoreGet() throws IOException, ZarrException { + S3Store s3Store = new S3Store(S3Client.builder() + .region(Region.of("eu-west-1")) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .build(), "static.webknossos.org", "data"); + String[] keys = new String[]{"zarr_v3", "l4_sample", "color", "1", "zarr.json"}; + + ByteBuffer buffer = s3Store.get(keys); + ByteBuffer bufferWithStart = s3Store.get(keys, 10); + Assertions.assertEquals(10, buffer.remaining()-bufferWithStart.remaining()); + + ByteBuffer bufferWithStartAndEnd = s3Store.get(keys, 0, 10); + Assertions.assertEquals(10, bufferWithStartAndEnd.remaining()); + + } + + static Stream inputStreamStores() throws IOException { + String[] s3StoreKeys = new String[]{"zarr_v3", "l4_sample", "color", "1", "zarr.json"}; + StoreHandle s3StoreHandle = new S3Store(S3Client.builder() + .region(Region.of("eu-west-1")) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .build(), "static.webknossos.org", "data") + .resolve(s3StoreKeys); + + byte[] testData = new byte[100]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) i; + } + + StoreHandle memoryStoreHandle = new MemoryStore().resolve(); + memoryStoreHandle.set(ByteBuffer.wrap(testData)); + + StoreHandle fsStoreHandle = new FilesystemStore(TESTOUTPUT.resolve("testInputStreamFS")).resolve("testfile"); + fsStoreHandle.set(ByteBuffer.wrap(testData)); + + zipFile(TESTOUTPUT.resolve("testInputStreamFS"), TESTOUTPUT.resolve("testInputStreamZIP.zip")); + StoreHandle bufferedZipStoreHandle = new BufferedZipStore(TESTOUTPUT.resolve("testInputStreamZIP.zip"), true) + .resolve("testfile"); + + StoreHandle readOnlyZipStoreHandle = new ReadOnlyZipStore(TESTOUTPUT.resolve("testInputStreamZIP.zip")) + .resolve("testfile"); + + StoreHandle httpStoreHandle = new HttpStore("https://static.webknossos.org/data/zarr_v3/l4_sample") + .resolve("color", "1", "zarr.json"); + return Stream.of( + memoryStoreHandle, + s3StoreHandle, + fsStoreHandle, + bufferedZipStoreHandle, + readOnlyZipStoreHandle, + httpStoreHandle + ); + } + + @ParameterizedTest + @MethodSource("inputStreamStores") + public void testStoreInputStream(StoreHandle storeHandle) throws IOException, ZarrException { + InputStream is = storeHandle.getInputStream(10, 20); + byte[] buffer = new byte[10]; + int bytesRead = is.read(buffer); + Assertions.assertEquals(10, bytesRead); + byte[] expectedBuffer = new byte[10]; + storeHandle.read(10, 20).get(expectedBuffer); + Assertions.assertArrayEquals(expectedBuffer, buffer); + } + + @ParameterizedTest + @MethodSource("inputStreamStores") + public void testStoreGetSize(StoreHandle storeHandle) throws IOException, ZarrException { + long size = storeHandle.getSize(); + long actual_size = storeHandle.read().remaining(); + Assertions.assertEquals(actual_size, size); + } + @Test public void testHttpStore() throws IOException, ZarrException { HttpStore httpStore = new dev.zarr.zarrjava.store.HttpStore("https://static.webknossos.org/data/zarr_v3/l4_sample"); @@ -85,13 +174,12 @@ public void testHttpStore() throws IOException, ZarrException { @ParameterizedTest @CsvSource({"false", "true",}) public void testMemoryStoreV3(boolean useParallel) throws ZarrException, IOException { - int[] testData = new int[1024 * 1024]; - Arrays.setAll(testData, p -> p); + int[] testData = testData(); - Group group = Group.create(new MemoryStore().resolve()); + dev.zarr.zarrjava.v3.Group group = dev.zarr.zarrjava.v3.Group.create(new MemoryStore().resolve()); Array array = group.createArray("array", b -> b .withShape(1024, 1024) - .withDataType(DataType.UINT32) + .withDataType(dev.zarr.zarrjava.v3.DataType.UINT32) .withChunkShape(5, 5) ); array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData), useParallel); @@ -110,14 +198,13 @@ public void testMemoryStoreV3(boolean useParallel) throws ZarrException, IOExcep @ParameterizedTest @CsvSource({"false", "true",}) public void testMemoryStoreV2(boolean useParallel) throws ZarrException, IOException { - int[] testData = new int[1024 * 1024]; - Arrays.setAll(testData, p -> p); + int[] testData = testData(); dev.zarr.zarrjava.v2.Group group = dev.zarr.zarrjava.v2.Group.create(new MemoryStore().resolve()); dev.zarr.zarrjava.v2.Array array = group.createArray("array", b -> b .withShape(1024, 1024) .withDataType(dev.zarr.zarrjava.v2.DataType.UINT32) - .withChunks(5, 5) + .withChunks(512, 512) ); array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData), useParallel); group.createGroup("subgroup"); @@ -132,4 +219,220 @@ public void testMemoryStoreV2(boolean useParallel) throws ZarrException, IOExcep Assertions.assertEquals("test group", attrs.getString("description")); } + + @Test + public void testOpenZipStore() throws ZarrException, IOException { + Path sourceDir = TESTOUTPUT.resolve("testZipStore"); + Path targetDir = TESTOUTPUT.resolve("testZipStore.zip"); + FilesystemStore fsStore = new FilesystemStore(sourceDir); + writeTestGroupV3(fsStore, true); + + zipFile(sourceDir, targetDir); + + BufferedZipStore zipStore = new BufferedZipStore(targetDir); + assertIsTestGroupV3(Group.open(zipStore.resolve()), true); + + ReadOnlyZipStore readOnlyZipStore = new ReadOnlyZipStore(targetDir); + assertIsTestGroupV3(Group.open(readOnlyZipStore.resolve()), true); + } + + @ParameterizedTest + @CsvSource({"false", "true",}) + public void testWriteZipStore(boolean flushOnWrite) throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testWriteZipStore" + (flushOnWrite ? "Flush" : "NoFlush") + ".zip"); + BufferedZipStore zipStore = new BufferedZipStore(path, flushOnWrite); + writeTestGroupV3(zipStore, true); + if(!flushOnWrite) zipStore.flush(); + + BufferedZipStore zipStoreRead = new BufferedZipStore(path); + assertIsTestGroupV3(Group.open(zipStoreRead.resolve()), true); + + Path unzippedPath = TESTOUTPUT.resolve("testWriteZipStoreUnzipped" + (flushOnWrite ? "Flush" : "NoFlush")); + + unzipFile(path, unzippedPath); + FilesystemStore fsStore = new FilesystemStore(unzippedPath); + assertIsTestGroupV3(Group.open(fsStore.resolve()), true); + } + + @ParameterizedTest + @CsvSource({"false", "true",}) + public void testZipStoreWithComment(boolean flushOnWrite) throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testZipStoreWithComment"+ (flushOnWrite ? "Flush" : "NoFlush") + ".zip"); + String comment = "{\"ome\": { \"version\": \"XX.YY\" }}"; + BufferedZipStore zipStore = new BufferedZipStore(path, comment, flushOnWrite); + writeTestGroupV3(zipStore, true); + if(!flushOnWrite) zipStore.flush(); + + try (java.util.zip.ZipFile zipFile = new java.util.zip.ZipFile(path.toFile())) { + String retrievedComment = zipFile.getComment(); + Assertions.assertEquals(comment, retrievedComment, "ZIP archive comment does not match expected value."); + } + + Assertions.assertEquals(comment, new BufferedZipStore(path).getArchiveComment(), "ZIP archive comment from store does not match expected value."); + } + + /** + * Test that ZipStore meets requirements for underlying store of Zipped OME-Zarr + * @see RFC-9: Zipped OME-Zarr + */ + @Test + public void testZipStoreRequirements() throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testZipStoreRequirements.zip"); + BufferedZipStore zipStore = new BufferedZipStore(path); + + dev.zarr.zarrjava.v3.Group group = dev.zarr.zarrjava.v3.Group.create(zipStore.resolve()); + Array array = group.createArray("a1", b -> b + .withShape(1024, 1024) + .withDataType(dev.zarr.zarrjava.v3.DataType.UINT32) + .withChunkShape(512, 512) + ); + array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), true); + + dev.zarr.zarrjava.v3.Group g1 = group.createGroup("g1"); + g1.createGroup("g1_1").createGroup("g1_1_1"); + g1.createGroup("g1_2"); + group.createGroup("g2").createGroup("g2_1"); + group.createGroup("g3"); + + zipStore.flush(); + + try (ZipFile zip = new ZipFile(path.toFile())) { + ArrayList entries = Collections.list(zip.getEntries()); + + // no compression + for (ZipArchiveEntry e : entries) { + Assertions.assertEquals(ZipEntry.STORED, e.getMethod(), "Entry " + e.getName() + " is compressed"); + } + + // correct order of zarr.json files + String[] expectedFirstEntries = new String[]{ + "zarr.json", + "a1/zarr.json", + "g1/zarr.json", + "g2/zarr.json", + "g3/zarr.json", + "g1/g1_1/zarr.json", + "g1/g1_2/zarr.json", + "g2/g2_1/zarr.json", + "g1/g1_1/g1_1_1/zarr.json" + }; + String[] actualFirstEntries = entries.stream() + .map(ZipArchiveEntry::getName) + .limit(expectedFirstEntries.length) + .toArray(String[]::new); + + Assertions.assertArrayEquals(expectedFirstEntries, actualFirstEntries, "zarr.json files are not in the expected breadth-first order"); + } + } + + + @ParameterizedTest + @CsvSource({"false", "true",}) + public void testZipStoreV2(boolean flushOnWrite) throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testZipStoreV2" + (flushOnWrite ? "Flush" : "NoFlush") + ".zip"); + BufferedZipStore zipStore = new BufferedZipStore(path, flushOnWrite); + writeTestGroupV2(zipStore, true); + if(!flushOnWrite) zipStore.flush(); + + BufferedZipStore zipStoreRead = new BufferedZipStore(path); + assertIsTestGroupV2(dev.zarr.zarrjava.core.Group.open(zipStoreRead.resolve()), true); + + Path unzippedPath = TESTOUTPUT.resolve("testZipStoreV2Unzipped"); + + unzipFile(path, unzippedPath); + FilesystemStore fsStore = new FilesystemStore(unzippedPath); + assertIsTestGroupV2(dev.zarr.zarrjava.core.Group.open(fsStore.resolve()), true); + } + + @Test + public void testReadOnlyZipStore() throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testReadOnlyZipStore.zip"); + String archiveComment = "This is a test ZIP archive comment."; + BufferedZipStore zipStore = new BufferedZipStore(path, archiveComment); + writeTestGroupV3(zipStore, true); + zipStore.flush(); + + ReadOnlyZipStore readOnlyZipStore = new ReadOnlyZipStore(path); + Assertions.assertEquals(archiveComment, readOnlyZipStore.getArchiveComment(), "ZIP archive comment from ReadOnlyZipStore does not match expected value."); + assertIsTestGroupV3(Group.open(readOnlyZipStore.resolve()), true); + } + + + static Stream localStores() { + return Stream.of( + new MemoryStore(), + new FilesystemStore(TESTOUTPUT.resolve("testLocalStoresFS")), + new BufferedZipStore(TESTOUTPUT.resolve("testLocalStoresZIP.zip"), true) + ); + } + + @ParameterizedTest + @MethodSource("localStores") + public void testLocalStores(Store store) throws IOException, ZarrException { + boolean useParallel = true; + Group group = writeTestGroupV3(store, useParallel); + assertIsTestGroupV3(group, useParallel); + } + + + int[] testData(){ + int[] testData = new int[1024 * 1024]; + Arrays.setAll(testData, p -> p); + return testData; + } + + Group writeTestGroupV3(Store store, boolean useParallel) throws ZarrException, IOException { + StoreHandle storeHandle = store.resolve(); + + dev.zarr.zarrjava.v3.Group group = dev.zarr.zarrjava.v3.Group.create(storeHandle); + dev.zarr.zarrjava.v3.Array array = group.createArray("array", b -> b + .withShape(1024, 1024) + .withDataType(dev.zarr.zarrjava.v3.DataType.UINT32) + .withChunkShape(512, 512) + ); + array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), useParallel); + group.createGroup("subgroup"); + group.setAttributes(new Attributes(b -> b.set("some", "value"))); + return group; + } + + void assertIsTestGroupV3(Group group, boolean useParallel) throws ZarrException, IOException { + Stream nodes = group.list(); + Assertions.assertEquals(2, nodes.count()); + Array array = (Array) group.get("array"); + Assertions.assertNotNull(array); + ucar.ma2.Array result = array.read(useParallel); + Assertions.assertArrayEquals(testData(), (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT)); + Attributes attrs = group.metadata().attributes(); + Assertions.assertNotNull(attrs); + Assertions.assertEquals("value", attrs.getString("some")); + } + + + dev.zarr.zarrjava.v2.Group writeTestGroupV2(Store store, boolean useParallel) throws ZarrException, IOException { + StoreHandle storeHandle = store.resolve(); + + dev.zarr.zarrjava.v2.Group group = dev.zarr.zarrjava.v2.Group.create(storeHandle); + dev.zarr.zarrjava.v2.Array array = group.createArray("array", b -> b + .withShape(1024, 1024) + .withDataType(dev.zarr.zarrjava.v2.DataType.UINT32) + .withChunks(512, 512) + ); + array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), useParallel); + group.createGroup("subgroup"); + group.setAttributes(new Attributes().set("some", "value")); + return group; + } + + void assertIsTestGroupV2(dev.zarr.zarrjava.core.Group group, boolean useParallel) throws ZarrException, IOException { + Stream nodes = group.list(); + Assertions.assertEquals(2, nodes.count()); + dev.zarr.zarrjava.v2.Array array = (dev.zarr.zarrjava.v2.Array) group.get("array"); + Assertions.assertNotNull(array); + ucar.ma2.Array result = array.read(useParallel); + Assertions.assertArrayEquals(testData(), (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT)); + Attributes attrs = group.metadata().attributes(); + Assertions.assertNotNull(attrs); + Assertions.assertEquals("value", attrs.getString("some")); + } } diff --git a/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java b/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java index 346fd2a..6522fae 100644 --- a/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java +++ b/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java @@ -406,8 +406,6 @@ public void testMemoryStore() throws ZarrException, IOException { ); group.createGroup("subgroup"); Assertions.assertEquals(2, group.list().count()); - for(String s: storeHandle.list().toArray(String[]::new)) - System.out.println(s); } } \ No newline at end of file