From 42aef913ba9a3d56a08cf83f8f5742b27ed52906 Mon Sep 17 00:00:00 2001 From: Vladimir Prus Date: Wed, 21 Jan 2026 10:32:45 +0000 Subject: [PATCH] Xor: implement stream serialization There are already methods operating on ByteBuffer, however versions taking streams allow to avoid data copying, and more directly compatible with various network APIs. Also, for reference, Guava Bloom filter implementation uses streams. There's a bit of code duplication as the result. Sadly, there's no standard class to wrap ByteBuffer in a stream, and pulling in an external dependency seems like an overkill here. --- .../src/main/java/org/fastfilter/Filter.java | 13 ++++ .../main/java/org/fastfilter/xor/Xor16.java | 28 ++++++++ .../main/java/org/fastfilter/xor/Xor8.java | 18 +++++ .../org/fastfilter/xor/XorBinaryFuse16.java | 30 ++++++++ .../org/fastfilter/xor/XorBinaryFuse32.java | 30 ++++++++ .../org/fastfilter/xor/XorBinaryFuse8.java | 26 +++++++ .../org/fastfilter/xor/SerializationTest.java | 70 +++++++++++++++++-- 7 files changed, 209 insertions(+), 6 deletions(-) diff --git a/fastfilter/src/main/java/org/fastfilter/Filter.java b/fastfilter/src/main/java/org/fastfilter/Filter.java index 05bfe43..83bd4f6 100644 --- a/fastfilter/src/main/java/org/fastfilter/Filter.java +++ b/fastfilter/src/main/java/org/fastfilter/Filter.java @@ -1,5 +1,7 @@ package org.fastfilter; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; /** @@ -85,4 +87,15 @@ default int getSerializedSize() { default void serialize(ByteBuffer buffer) { throw new UnsupportedOperationException(); } + + /** + * Serializes the filter state into the provided {@code OutputStream}. + * + * @param out the output stream where the serialized state of the filter will be written + * @throws IOException if writing to the stream fails + * @throws UnsupportedOperationException if the operation is not supported by the filter implementation + */ + default void serialize(OutputStream out) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java b/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java index 603a3f1..265ec12 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import org.fastfilter.Filter; @@ -199,4 +204,27 @@ public static Xor16 deserialize(ByteBuffer buffer) { return new Xor16(blockLength, bitCount, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(blockLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + for (final short fp : fingerprints) { + dout.writeShort(fp); + } + } + + public static Xor16 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int blockLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final short[] fingerprints = new short[len]; + for (int i = 0; i < len; i++) { + fingerprints[i] = din.readShort(); + } + final int bitCount = len * BITS_PER_FINGERPRINT; + return new Xor16(blockLength, bitCount, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java b/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java index 142fb87..4b7c5b6 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java @@ -241,4 +241,22 @@ public static Xor8 deserialize(ByteBuffer buffer) { return new Xor8(size, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(size); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + dout.write(fingerprints); + } + + public static Xor8 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int size = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final byte[] fingerprints = new byte[len]; + din.readFully(fingerprints); + return new Xor8(size, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java index b0ef76a..588e70c 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; import org.fastfilter.Filter; @@ -325,4 +330,29 @@ public static XorBinaryFuse16 deserialize(ByteBuffer buffer) { return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(segmentLength); + dout.writeInt(segmentCountLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + for (final short fp : fingerprints) { + dout.writeShort(fp); + } + } + + public static XorBinaryFuse16 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int segmentLength = din.readInt(); + final int segmentCountLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final short[] fingerprints = new short[len]; + for (int i = 0; i < len; i++) { + fingerprints[i] = din.readShort(); + } + final int segmentCount = segmentCountLength / segmentLength; + return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java index fc1a01b..d226081 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -313,4 +318,29 @@ public static XorBinaryFuse32 deserialize(ByteBuffer buffer) { return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(segmentLength); + dout.writeInt(segmentCountLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + for (final int fp : fingerprints) { + dout.writeInt(fp); + } + } + + public static XorBinaryFuse32 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int segmentLength = din.readInt(); + final int segmentCountLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final int[] fingerprints = new int[len]; + for (int i = 0; i < len; i++) { + fingerprints[i] = din.readInt(); + } + final int segmentCount = segmentCountLength / segmentLength; + return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java index e8f0337..f4e31ff 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -321,4 +326,25 @@ public static XorBinaryFuse8 deserialize(ByteBuffer buffer) { return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(segmentLength); + dout.writeInt(segmentCountLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + dout.write(fingerprints); + } + + public static XorBinaryFuse8 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int segmentLength = din.readInt(); + final int segmentCountLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final byte[] fingerprints = new byte[len]; + din.readFully(fingerprints); + final int segmentCount = segmentCountLength / segmentLength; + return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints); + } } diff --git a/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java b/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java index df6a204..023475d 100644 --- a/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java +++ b/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java @@ -5,6 +5,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.function.Function; @@ -20,28 +24,36 @@ public class SerializationTest { private final String filterName; private final Function constructor; private final Function deserializer; + private final StreamDeserializer streamDeserializer; public SerializationTest(String filterName, Function constructor, - Function deserializer) { + Function deserializer, + StreamDeserializer streamDeserializer) { this.filterName = filterName; this.constructor = constructor; this.deserializer = deserializer; + this.streamDeserializer = streamDeserializer; } @Parameters(name = "{0}") public static List filters() { return List.of( new Object[] {"Xor8", (Function) Xor8::construct, - (Function) Xor8::deserialize}, + (Function) Xor8::deserialize, + (StreamDeserializer) Xor8::deserialize}, new Object[] {"Xor16", (Function) Xor16::construct, - (Function) Xor16::deserialize}, + (Function) Xor16::deserialize, + (StreamDeserializer) Xor16::deserialize}, new Object[] {"XorBinaryFuse8", (Function) XorBinaryFuse8::construct, - (Function) XorBinaryFuse8::deserialize}, + (Function) XorBinaryFuse8::deserialize, + (StreamDeserializer) XorBinaryFuse8::deserialize}, new Object[] {"XorBinaryFuse16", (Function) XorBinaryFuse16::construct, - (Function) XorBinaryFuse16::deserialize}, + (Function) XorBinaryFuse16::deserialize, + (StreamDeserializer) XorBinaryFuse16::deserialize}, new Object[] {"XorBinaryFuse32", (Function) XorBinaryFuse32::construct, - (Function) XorBinaryFuse32::deserialize} + (Function) XorBinaryFuse32::deserialize, + (StreamDeserializer) XorBinaryFuse32::deserialize} ); } @@ -85,6 +97,52 @@ public void shouldSerializeAndDeserializeMediumFilter() { assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L)); } + @Test + public void shouldSerializeAndDeserializeMediumFilterFromStream() throws IOException { + // Arrange + final var keys = new long[]{100L, 200L, 300L, 400L, 500L, 600L, 700L, 800L, 900L, 1000L}; + final var originalFilter = constructor.apply(keys); + final var buffer = ByteBuffer.allocate(originalFilter.getSerializedSize()); + + // Act + originalFilter.serialize(buffer); + final var input = new ByteArrayInputStream(buffer.array()); + final var deserializedFilter = streamDeserializer.deserialize(input); + + // Assert + for (final long key : keys) { + assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter", + deserializedFilter.mayContain(key)); + } + assertFalse("Key 50L should not be in " + filterName + " filter", deserializedFilter.mayContain(50L)); + assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L)); + } + + @Test + public void shouldSerializeToStreamAndDeserializeFromByteBuffer() throws IOException { + // Arrange + final var keys = new long[]{10L, 20L, 30L, 40L, 50L, 60L, 70L, 80L}; + final var originalFilter = constructor.apply(keys); + final var out = new ByteArrayOutputStream(); + + // Act + originalFilter.serialize(out); + final var buffer = ByteBuffer.wrap(out.toByteArray()); + final var deserializedFilter = deserializer.apply(buffer); + + // Assert + for (final long key : keys) { + assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter", + deserializedFilter.mayContain(key)); + } + assertFalse("Key 15L should not be in " + filterName + " filter", deserializedFilter.mayContain(15L)); + } + + @FunctionalInterface + private interface StreamDeserializer { + Filter deserialize(InputStream in) throws IOException; + } + @Test public void shouldSerializeAndDeserializeLargeFilter() { // Arrange