Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions fastfilter/src/main/java/org/fastfilter/Filter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.fastfilter;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
Expand Down Expand Up @@ -85,4 +87,15 @@ default int getSerializedSize() {
default void serialize(ByteBuffer buffer) {
throw new UnsupportedOperationException();
}

/**
* Serializes the filter state into the provided {@code OutputStream}.
*
* @param out the output stream where the serialized state of the filter will be written
* @throws IOException if writing to the stream fails
* @throws UnsupportedOperationException if the operation is not supported by the filter implementation
*/
default void serialize(OutputStream out) throws IOException {
throw new UnsupportedOperationException();
}
}
28 changes: 28 additions & 0 deletions fastfilter/src/main/java/org/fastfilter/xor/Xor16.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.fastfilter.xor;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import org.fastfilter.Filter;
Expand Down Expand Up @@ -199,4 +204,27 @@ public static Xor16 deserialize(ByteBuffer buffer) {

return new Xor16(blockLength, bitCount, seed, fingerprints);
}

public void serialize(OutputStream out) throws IOException {
DataOutputStream dout = new DataOutputStream(out);
dout.writeInt(blockLength);
dout.writeLong(seed);
dout.writeInt(fingerprints.length);
for (final short fp : fingerprints) {
dout.writeShort(fp);
}
}

public static Xor16 deserialize(InputStream in) throws IOException {
DataInputStream din = new DataInputStream(in);
final int blockLength = din.readInt();
final long seed = din.readLong();
final int len = din.readInt();
final short[] fingerprints = new short[len];
for (int i = 0; i < len; i++) {
fingerprints[i] = din.readShort();
}
final int bitCount = len * BITS_PER_FINGERPRINT;
return new Xor16(blockLength, bitCount, seed, fingerprints);
}
}
18 changes: 18 additions & 0 deletions fastfilter/src/main/java/org/fastfilter/xor/Xor8.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,22 @@ public static Xor8 deserialize(ByteBuffer buffer) {

return new Xor8(size, seed, fingerprints);
}

public void serialize(OutputStream out) throws IOException {
DataOutputStream dout = new DataOutputStream(out);
dout.writeInt(size);
dout.writeLong(seed);
dout.writeInt(fingerprints.length);
dout.write(fingerprints);
}

public static Xor8 deserialize(InputStream in) throws IOException {
DataInputStream din = new DataInputStream(in);
final int size = din.readInt();
final long seed = din.readLong();
final int len = din.readInt();
final byte[] fingerprints = new byte[len];
din.readFully(fingerprints);
return new Xor8(size, seed, fingerprints);
}
}
30 changes: 30 additions & 0 deletions fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.fastfilter.xor;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.fastfilter.Filter;
Expand Down Expand Up @@ -325,4 +330,29 @@ public static XorBinaryFuse16 deserialize(ByteBuffer buffer) {

return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints);
}

public void serialize(OutputStream out) throws IOException {
DataOutputStream dout = new DataOutputStream(out);
dout.writeInt(segmentLength);
dout.writeInt(segmentCountLength);
dout.writeLong(seed);
dout.writeInt(fingerprints.length);
for (final short fp : fingerprints) {
dout.writeShort(fp);
}
}

public static XorBinaryFuse16 deserialize(InputStream in) throws IOException {
DataInputStream din = new DataInputStream(in);
final int segmentLength = din.readInt();
final int segmentCountLength = din.readInt();
final long seed = din.readLong();
final int len = din.readInt();
final short[] fingerprints = new short[len];
for (int i = 0; i < len; i++) {
fingerprints[i] = din.readShort();
}
final int segmentCount = segmentCountLength / segmentLength;
return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints);
}
}
30 changes: 30 additions & 0 deletions fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.fastfilter.xor;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;

Expand Down Expand Up @@ -313,4 +318,29 @@ public static XorBinaryFuse32 deserialize(ByteBuffer buffer) {

return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints);
}

public void serialize(OutputStream out) throws IOException {
DataOutputStream dout = new DataOutputStream(out);
dout.writeInt(segmentLength);
dout.writeInt(segmentCountLength);
dout.writeLong(seed);
dout.writeInt(fingerprints.length);
for (final int fp : fingerprints) {
dout.writeInt(fp);
}
}

public static XorBinaryFuse32 deserialize(InputStream in) throws IOException {
DataInputStream din = new DataInputStream(in);
final int segmentLength = din.readInt();
final int segmentCountLength = din.readInt();
final long seed = din.readLong();
final int len = din.readInt();
final int[] fingerprints = new int[len];
for (int i = 0; i < len; i++) {
fingerprints[i] = din.readInt();
}
final int segmentCount = segmentCountLength / segmentLength;
return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints);
}
}
26 changes: 26 additions & 0 deletions fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.fastfilter.xor;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;

Expand Down Expand Up @@ -321,4 +326,25 @@ public static XorBinaryFuse8 deserialize(ByteBuffer buffer) {

return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints);
}

public void serialize(OutputStream out) throws IOException {
DataOutputStream dout = new DataOutputStream(out);
dout.writeInt(segmentLength);
dout.writeInt(segmentCountLength);
dout.writeLong(seed);
dout.writeInt(fingerprints.length);
dout.write(fingerprints);
}

public static XorBinaryFuse8 deserialize(InputStream in) throws IOException {
DataInputStream din = new DataInputStream(in);
final int segmentLength = din.readInt();
final int segmentCountLength = din.readInt();
final long seed = din.readLong();
final int len = din.readInt();
final byte[] fingerprints = new byte[len];
din.readFully(fingerprints);
final int segmentCount = segmentCountLength / segmentLength;
return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints);
}
}
70 changes: 64 additions & 6 deletions fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;
Expand All @@ -20,28 +24,36 @@ public class SerializationTest {
private final String filterName;
private final Function<long[], Filter> constructor;
private final Function<ByteBuffer, Filter> deserializer;
private final StreamDeserializer streamDeserializer;

public SerializationTest(String filterName,
Function<long[], Filter> constructor,
Function<ByteBuffer, Filter> deserializer) {
Function<ByteBuffer, Filter> deserializer,
StreamDeserializer streamDeserializer) {
this.filterName = filterName;
this.constructor = constructor;
this.deserializer = deserializer;
this.streamDeserializer = streamDeserializer;
}

@Parameters(name = "{0}")
public static List<Object[]> filters() {
return List.of(
new Object[] {"Xor8", (Function<long[], Filter>) Xor8::construct,
(Function<ByteBuffer, Filter>) Xor8::deserialize},
(Function<ByteBuffer, Filter>) Xor8::deserialize,
(StreamDeserializer) Xor8::deserialize},
new Object[] {"Xor16", (Function<long[], Filter>) Xor16::construct,
(Function<ByteBuffer, Filter>) Xor16::deserialize},
(Function<ByteBuffer, Filter>) Xor16::deserialize,
(StreamDeserializer) Xor16::deserialize},
new Object[] {"XorBinaryFuse8", (Function<long[], Filter>) XorBinaryFuse8::construct,
(Function<ByteBuffer, Filter>) XorBinaryFuse8::deserialize},
(Function<ByteBuffer, Filter>) XorBinaryFuse8::deserialize,
(StreamDeserializer) XorBinaryFuse8::deserialize},
new Object[] {"XorBinaryFuse16", (Function<long[], Filter>) XorBinaryFuse16::construct,
(Function<ByteBuffer, Filter>) XorBinaryFuse16::deserialize},
(Function<ByteBuffer, Filter>) XorBinaryFuse16::deserialize,
(StreamDeserializer) XorBinaryFuse16::deserialize},
new Object[] {"XorBinaryFuse32", (Function<long[], Filter>) XorBinaryFuse32::construct,
(Function<ByteBuffer, Filter>) XorBinaryFuse32::deserialize}
(Function<ByteBuffer, Filter>) XorBinaryFuse32::deserialize,
(StreamDeserializer) XorBinaryFuse32::deserialize}
);
}

Expand Down Expand Up @@ -85,6 +97,52 @@ public void shouldSerializeAndDeserializeMediumFilter() {
assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L));
}

@Test
public void shouldSerializeAndDeserializeMediumFilterFromStream() throws IOException {
// Arrange
final var keys = new long[]{100L, 200L, 300L, 400L, 500L, 600L, 700L, 800L, 900L, 1000L};
final var originalFilter = constructor.apply(keys);
final var buffer = ByteBuffer.allocate(originalFilter.getSerializedSize());

// Act
originalFilter.serialize(buffer);
final var input = new ByteArrayInputStream(buffer.array());
final var deserializedFilter = streamDeserializer.deserialize(input);

// Assert
for (final long key : keys) {
assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter",
deserializedFilter.mayContain(key));
}
assertFalse("Key 50L should not be in " + filterName + " filter", deserializedFilter.mayContain(50L));
assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L));
}

@Test
public void shouldSerializeToStreamAndDeserializeFromByteBuffer() throws IOException {
// Arrange
final var keys = new long[]{10L, 20L, 30L, 40L, 50L, 60L, 70L, 80L};
final var originalFilter = constructor.apply(keys);
final var out = new ByteArrayOutputStream();

// Act
originalFilter.serialize(out);
final var buffer = ByteBuffer.wrap(out.toByteArray());
final var deserializedFilter = deserializer.apply(buffer);

// Assert
for (final long key : keys) {
assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter",
deserializedFilter.mayContain(key));
}
assertFalse("Key 15L should not be in " + filterName + " filter", deserializedFilter.mayContain(15L));
}

@FunctionalInterface
private interface StreamDeserializer {
Filter deserialize(InputStream in) throws IOException;
}

@Test
public void shouldSerializeAndDeserializeLargeFilter() {
// Arrange
Expand Down