Skip to content

KAFKA-6629: parameterise SegmentedCacheFunctionTest for session key schemas #19404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,15 @@ public static void writeBinary(final ByteBuffer buf,
buf.putLong(endTime);
buf.putLong(startTime);
}

static Bytes toStoreKeyBinary(final byte[] serializedKey,
final long endTime,
final long startTime) {
final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + TIMESTAMP_SIZE);
buf.put(serializedKey);
buf.putLong(endTime);
buf.putLong(startTime);

return Bytes.wrap(buf.array());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only used in the test maybe move it to SegmentedCacheFunctionTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me @bbejeck, change applied.

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,97 +19,139 @@

import org.apache.kafka.common.utils.Bytes;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.ByteBuffer;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;

// TODO: this test coverage does not consider session serde yet
public class SegmentedCacheFunctionTest {
class SegmentedCacheFunctionTest {

private static final int SEGMENT_INTERVAL = 17;
private static final int TIMESTAMP = 736213517;
private static final int START_TIMESTAMP = 736213517;
private static final int END_TIMESTAMP = 800000000;

private static final Bytes THE_KEY = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42);
private static final Bytes THE_CACHE_KEY = Bytes.wrap(
ByteBuffer.allocate(8 + THE_KEY.get().length)
.putLong(TIMESTAMP / SEGMENT_INTERVAL)
.put(THE_KEY.get()).array()
private static final Bytes THE_WINDOW_KEY = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, START_TIMESTAMP, 42);
private static final Bytes THE_SESSION_KEY = SessionKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, END_TIMESTAMP, START_TIMESTAMP);

private static final Bytes THE_WINDOW_CACHE_KEY = Bytes.wrap(
ByteBuffer.allocate(8 + THE_WINDOW_KEY.get().length)
.putLong(START_TIMESTAMP / SEGMENT_INTERVAL)
.put(THE_WINDOW_KEY.get()).array()
);

private final SegmentedCacheFunction cacheFunction = new SegmentedCacheFunction(new WindowKeySchema(), SEGMENT_INTERVAL);
private static final Bytes THE_SESSION_CACHE_KEY = Bytes.wrap(
ByteBuffer.allocate(8 + THE_SESSION_KEY.get().length)
.putLong(END_TIMESTAMP / SEGMENT_INTERVAL)
.put(THE_SESSION_KEY.get()).array()
);

private SegmentedCacheFunction createCacheFunction(final SegmentedBytesStore.KeySchema keySchema) {
return new SegmentedCacheFunction(keySchema, SEGMENT_INTERVAL);
}

@Test
public void key() {
assertThat(
cacheFunction.key(THE_CACHE_KEY),
equalTo(THE_KEY)
private static Stream<Arguments> provideKeysAndSchemas() {
return Stream.of(
Arguments.of(THE_WINDOW_CACHE_KEY, THE_WINDOW_KEY, new WindowKeySchema()),
Arguments.of(THE_SESSION_CACHE_KEY, THE_SESSION_KEY, new SessionKeySchema())
);
}

private static Stream<Arguments> provideKeysTimestampsAndSchemas() {
return Stream.of(
Arguments.of(THE_WINDOW_KEY, START_TIMESTAMP, new WindowKeySchema()),
Arguments.of(THE_SESSION_KEY, END_TIMESTAMP, new SessionKeySchema())
);
}

private static Stream<Arguments> provideKeysForBoundaryChecks() {
final Bytes sameKeyInPriorSegmentWindow = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
final Bytes sameKeyInPriorSegmentSession = SessionKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 12345);

final Bytes lowerKeyInSameSegmentWindow = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, START_TIMESTAMP - 1, 0);
final Bytes lowerKeyInSameSegmentSession = SessionKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, END_TIMESTAMP - 1, START_TIMESTAMP + 1);

return Stream.of(
Arguments.of(THE_WINDOW_KEY, new WindowKeySchema(), sameKeyInPriorSegmentWindow, lowerKeyInSameSegmentWindow),
Arguments.of(THE_SESSION_KEY, new SessionKeySchema(), sameKeyInPriorSegmentSession, lowerKeyInSameSegmentSession)
);
}

@Test
public void cacheKey() {
final long segmentId = TIMESTAMP / SEGMENT_INTERVAL;
@ParameterizedTest
@MethodSource("provideKeysAndSchemas")
void testKey(final Bytes cacheKey, final Bytes key, final SegmentedBytesStore.KeySchema keySchema) {
assertThat(
createCacheFunction(keySchema).key(cacheKey),
equalTo(key)
);
}

final Bytes actualCacheKey = cacheFunction.cacheKey(THE_KEY);
@ParameterizedTest
@MethodSource("provideKeysTimestampsAndSchemas")
void cacheKey(final Bytes key, final int timeStamp, final SegmentedBytesStore.KeySchema keySchema) {
final long segmentId = timeStamp / SEGMENT_INTERVAL;
final Bytes actualCacheKey = createCacheFunction(keySchema).cacheKey(key);
final ByteBuffer buffer = ByteBuffer.wrap(actualCacheKey.get());

assertThat(buffer.getLong(), equalTo(segmentId));

final byte[] actualKey = new byte[buffer.remaining()];
buffer.get(actualKey);
assertThat(Bytes.wrap(actualKey), equalTo(THE_KEY));
assertThat(Bytes.wrap(actualKey), equalTo(key));
}

@Test
public void testRoundTripping() {
@ParameterizedTest
@MethodSource("provideKeysAndSchemas")
void testRoundTripping(final Bytes cacheKey, final Bytes key, final SegmentedBytesStore.KeySchema keySchema) {
final SegmentedCacheFunction cacheFunction = createCacheFunction(keySchema);

assertThat(
cacheFunction.key(cacheFunction.cacheKey(THE_KEY)),
equalTo(THE_KEY)
cacheFunction.key(cacheFunction.cacheKey(key)),
equalTo(key)
);

assertThat(
cacheFunction.cacheKey(cacheFunction.key(THE_CACHE_KEY)),
equalTo(THE_CACHE_KEY)
cacheFunction.cacheKey(cacheFunction.key(cacheKey)),
equalTo(cacheKey)
);
}

@Test
public void compareSegmentedKeys() {
@ParameterizedTest
@MethodSource("provideKeysForBoundaryChecks")
void compareSegmentedKeys(final Bytes key, final SegmentedBytesStore.KeySchema keySchema, final Bytes sameKeyInPriorSegment, final Bytes lowerKeyInSameSegment) {
final SegmentedCacheFunction cacheFunction = createCacheFunction(keySchema);
assertThat(
"same key in same segment should be ranked the same",
cacheFunction.compareSegmentedKeys(
cacheFunction.cacheKey(THE_KEY),
THE_KEY
cacheFunction.cacheKey(key),
key
) == 0
);

final Bytes sameKeyInPriorSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42);

assertThat(
"same keys in different segments should be ordered according to segment",
cacheFunction.compareSegmentedKeys(
cacheFunction.cacheKey(sameKeyInPriorSegment),
THE_KEY
key
) < 0
);

assertThat(
"same keys in different segments should be ordered according to segment",
cacheFunction.compareSegmentedKeys(
cacheFunction.cacheKey(THE_KEY),
cacheFunction.cacheKey(key),
sameKeyInPriorSegment
) > 0
);

final Bytes lowerKeyInSameSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0);

assertThat(
"different keys in same segments should be ordered according to key",
cacheFunction.compareSegmentedKeys(
cacheFunction.cacheKey(THE_KEY),
cacheFunction.cacheKey(key),
lowerKeyInSameSegment
) > 0
);
Expand All @@ -118,9 +160,8 @@ public void compareSegmentedKeys() {
"different keys in same segments should be ordered according to key",
cacheFunction.compareSegmentedKeys(
cacheFunction.cacheKey(lowerKeyInSameSegment),
THE_KEY
key
) < 0
);
}

}