diff --git a/fastfilter/src/main/java/org/fastfilter/Filter.java b/fastfilter/src/main/java/org/fastfilter/Filter.java index 05bfe43..83bd4f6 100644 --- a/fastfilter/src/main/java/org/fastfilter/Filter.java +++ b/fastfilter/src/main/java/org/fastfilter/Filter.java @@ -1,5 +1,7 @@ package org.fastfilter; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; /** @@ -85,4 +87,15 @@ default int getSerializedSize() { default void serialize(ByteBuffer buffer) { throw new UnsupportedOperationException(); } + + /** + * Serializes the filter state into the provided {@code OutputStream}. + * + * @param out the output stream where the serialized state of the filter will be written + * @throws IOException if writing to the stream fails + * @throws UnsupportedOperationException if the operation is not supported by the filter implementation + */ + default void serialize(OutputStream out) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java b/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java index 603a3f1..265ec12 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/Xor16.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import org.fastfilter.Filter; @@ -199,4 +204,27 @@ public static Xor16 deserialize(ByteBuffer buffer) { return new Xor16(blockLength, bitCount, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(blockLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + for (final short fp : fingerprints) { + dout.writeShort(fp); + } + } + + public static Xor16 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int blockLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final short[] fingerprints = new short[len]; + for (int i = 0; i < len; i++) { + fingerprints[i] = din.readShort(); + } + final int bitCount = len * BITS_PER_FINGERPRINT; + return new Xor16(blockLength, bitCount, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java b/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java index 142fb87..4b7c5b6 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/Xor8.java @@ -241,4 +241,22 @@ public static Xor8 deserialize(ByteBuffer buffer) { return new Xor8(size, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(size); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + dout.write(fingerprints); + } + + public static Xor8 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int size = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final byte[] fingerprints = new byte[len]; + din.readFully(fingerprints); + return new Xor8(size, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java index b0ef76a..588e70c 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; import org.fastfilter.Filter; @@ -325,4 +330,29 @@ public static XorBinaryFuse16 deserialize(ByteBuffer buffer) { return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(segmentLength); + dout.writeInt(segmentCountLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + for (final short fp : fingerprints) { + dout.writeShort(fp); + } + } + + public static XorBinaryFuse16 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int segmentLength = din.readInt(); + final int segmentCountLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final short[] fingerprints = new short[len]; + for (int i = 0; i < len; i++) { + fingerprints[i] = din.readShort(); + } + final int segmentCount = segmentCountLength / segmentLength; + return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java index fc1a01b..d226081 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -313,4 +318,29 @@ public static XorBinaryFuse32 deserialize(ByteBuffer buffer) { return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(segmentLength); + dout.writeInt(segmentCountLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + for (final int fp : fingerprints) { + dout.writeInt(fp); + } + } + + public static XorBinaryFuse32 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int segmentLength = din.readInt(); + final int segmentCountLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final int[] fingerprints = new int[len]; + for (int i = 0; i < len; i++) { + fingerprints[i] = din.readInt(); + } + final int segmentCount = segmentCountLength / segmentLength; + return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints); + } } diff --git a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java index e8f0337..f4e31ff 100644 --- a/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java +++ b/fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java @@ -1,5 +1,10 @@ package org.fastfilter.xor; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -321,4 +326,25 @@ public static XorBinaryFuse8 deserialize(ByteBuffer buffer) { return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints); } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(segmentLength); + dout.writeInt(segmentCountLength); + dout.writeLong(seed); + dout.writeInt(fingerprints.length); + dout.write(fingerprints); + } + + public static XorBinaryFuse8 deserialize(InputStream in) throws IOException { + DataInputStream din = new DataInputStream(in); + final int segmentLength = din.readInt(); + final int segmentCountLength = din.readInt(); + final long seed = din.readLong(); + final int len = din.readInt(); + final byte[] fingerprints = new byte[len]; + din.readFully(fingerprints); + final int segmentCount = segmentCountLength / segmentLength; + return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints); + } } diff --git a/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java b/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java index df6a204..023475d 100644 --- a/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java +++ b/fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java @@ -5,6 +5,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.function.Function; @@ -20,28 +24,36 @@ public class SerializationTest { private final String filterName; private final Function constructor; private final Function deserializer; + private final StreamDeserializer streamDeserializer; public SerializationTest(String filterName, Function constructor, - Function deserializer) { + Function deserializer, + StreamDeserializer streamDeserializer) { this.filterName = filterName; this.constructor = constructor; this.deserializer = deserializer; + this.streamDeserializer = streamDeserializer; } @Parameters(name = "{0}") public static List filters() { return List.of( new Object[] {"Xor8", (Function) Xor8::construct, - (Function) Xor8::deserialize}, + (Function) Xor8::deserialize, + (StreamDeserializer) Xor8::deserialize}, new Object[] {"Xor16", (Function) Xor16::construct, - (Function) Xor16::deserialize}, + (Function) Xor16::deserialize, + (StreamDeserializer) Xor16::deserialize}, new Object[] {"XorBinaryFuse8", (Function) XorBinaryFuse8::construct, - (Function) XorBinaryFuse8::deserialize}, + (Function) XorBinaryFuse8::deserialize, + (StreamDeserializer) XorBinaryFuse8::deserialize}, new Object[] {"XorBinaryFuse16", (Function) XorBinaryFuse16::construct, - (Function) XorBinaryFuse16::deserialize}, + (Function) XorBinaryFuse16::deserialize, + (StreamDeserializer) XorBinaryFuse16::deserialize}, new Object[] {"XorBinaryFuse32", (Function) XorBinaryFuse32::construct, - (Function) XorBinaryFuse32::deserialize} + (Function) XorBinaryFuse32::deserialize, + (StreamDeserializer) XorBinaryFuse32::deserialize} ); } @@ -85,6 +97,52 @@ public void shouldSerializeAndDeserializeMediumFilter() { assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L)); } + @Test + public void shouldSerializeAndDeserializeMediumFilterFromStream() throws IOException { + // Arrange + final var keys = new long[]{100L, 200L, 300L, 400L, 500L, 600L, 700L, 800L, 900L, 1000L}; + final var originalFilter = constructor.apply(keys); + final var buffer = ByteBuffer.allocate(originalFilter.getSerializedSize()); + + // Act + originalFilter.serialize(buffer); + final var input = new ByteArrayInputStream(buffer.array()); + final var deserializedFilter = streamDeserializer.deserialize(input); + + // Assert + for (final long key : keys) { + assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter", + deserializedFilter.mayContain(key)); + } + assertFalse("Key 50L should not be in " + filterName + " filter", deserializedFilter.mayContain(50L)); + assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L)); + } + + @Test + public void shouldSerializeToStreamAndDeserializeFromByteBuffer() throws IOException { + // Arrange + final var keys = new long[]{10L, 20L, 30L, 40L, 50L, 60L, 70L, 80L}; + final var originalFilter = constructor.apply(keys); + final var out = new ByteArrayOutputStream(); + + // Act + originalFilter.serialize(out); + final var buffer = ByteBuffer.wrap(out.toByteArray()); + final var deserializedFilter = deserializer.apply(buffer); + + // Assert + for (final long key : keys) { + assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter", + deserializedFilter.mayContain(key)); + } + assertFalse("Key 15L should not be in " + filterName + " filter", deserializedFilter.mayContain(15L)); + } + + @FunctionalInterface + private interface StreamDeserializer { + Filter deserialize(InputStream in) throws IOException; + } + @Test public void shouldSerializeAndDeserializeLargeFilter() { // Arrange