From c4140cd547b298837507052d0e070b065f60f839 Mon Sep 17 00:00:00 2001 From: Masahiro Mori Date: Wed, 9 Jul 2025 10:03:58 +0900 Subject: [PATCH 1/4] style: fix lamda --- .../apache/kafka/storage/internals/log/AbstractIndex.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index eddd4ed8070be..790d475701de7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -282,10 +282,7 @@ public void closeHandler() { // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - inLockThrows(() -> - inRemapWriteLockThrows(() -> { - safeForceUnmap(); - })); + inLockThrows(() -> inRemapWriteLockThrows(this::safeForceUnmap)); } /** From 78abad1fb3a012cb556cf57c2d225520f12b384a Mon Sep 17 00:00:00 2001 From: Masahiro Mori Date: Wed, 9 Jul 2025 13:15:00 +0900 Subject: [PATCH 2/4] minor: improve comment --- .../apache/kafka/storage/internals/log/AbstractIndex.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index 790d475701de7..dbfdf91a1ccb5 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -48,7 +48,12 @@ private enum SearchResultType { private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); - // Serializes all index operations that mutate internal state + // Serializes all index operations that mutate internal state. + // Readers do not need to acquire this lock because: + // 1) They only access published entries and are not affected by concurrent writes. + // 2) MappedByteBuffer provides direct access to the OS-level buffer cache, + // which allows concurrent reads in practice. + // 3) Read and remap operations are coordinated via remapLock to ensure visibility. private final ReentrantLock lock = new ReentrantLock(); // Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock(); From d3c0e706b5f7cba4f46fd482c6c0e2f15ac8513b Mon Sep 17 00:00:00 2001 From: Masahiro Mori Date: Wed, 9 Jul 2025 15:01:15 +0900 Subject: [PATCH 3/4] refactor: unify lock utils --- .../apache/kafka/server/util/LockUtils.java | 49 +------------------ .../storage/internals/log/AbstractIndex.java | 33 +++++-------- .../storage/internals/log/TimeIndex.java | 2 +- 3 files changed, 15 insertions(+), 69 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java index 568d109daf3fe..86338726d5e04 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java @@ -18,7 +18,6 @@ import java.util.Objects; import java.util.concurrent.locks.Lock; -import java.util.function.Supplier; /** * A utility class providing helper methods for working with {@link Lock} objects. @@ -35,50 +34,6 @@ public interface ThrowingRunnable { void run() throws E; } - /** - * Executes the given {@link Supplier} within the context of the specified {@link Lock}. - * The lock is acquired before executing the supplier and released after the execution, - * ensuring that the lock is always released, even if an exception is thrown. - * - * @param the type of the result returned by the supplier - * @param lock the lock to be acquired and released - * @param supplier the supplier to be executed within the lock context - * @return the result of the supplier - * @throws NullPointerException if either {@code lock} or {@code supplier} is null - */ - public static T inLock(Lock lock, Supplier supplier) { - Objects.requireNonNull(lock, "Lock must not be null"); - Objects.requireNonNull(supplier, "Supplier must not be null"); - - lock.lock(); - try { - return supplier.get(); - } finally { - lock.unlock(); - } - } - - /** - * Executes the given {@link Runnable} within the context of the specified {@link Lock}. - * The lock is acquired before executing the runnable and released after the execution, - * ensuring that the lock is always released, even if an exception is thrown. - * - * @param lock the lock to be acquired and released - * @param runnable the runnable to be executed within the lock context - * @throws NullPointerException if either {@code lock} or {@code runnable} is null - */ - public static void inLock(Lock lock, Runnable runnable) { - Objects.requireNonNull(lock, "Lock must not be null"); - Objects.requireNonNull(runnable, "Runnable must not be null"); - - lock.lock(); - try { - runnable.run(); - } finally { - lock.unlock(); - } - } - /** * Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}. * The lock is acquired before executing the supplier and released after the execution, @@ -92,7 +47,7 @@ public static void inLock(Lock lock, Runnable runnable) { * @throws E if an exception occurs during the execution of the supplier * @throws NullPointerException if either {@code lock} or {@code supplier} is null */ - public static T inLockThrows(Lock lock, ThrowingSupplier supplier) throws E { + public static T inLock(Lock lock, ThrowingSupplier supplier) throws E { Objects.requireNonNull(lock, "Lock must not be null"); Objects.requireNonNull(supplier, "Supplier must not be null"); @@ -115,7 +70,7 @@ public static T inLockThrows(Lock lock, ThrowingSupplie * @throws E if an exception occurs during the execution of the runnable * @throws NullPointerException if either {@code lock} or {@code runnable} is null */ - public static void inLockThrows(Lock lock, ThrowingRunnable runnable) throws E { + public static void inLock(Lock lock, ThrowingRunnable runnable) throws E { Objects.requireNonNull(lock, "Lock must not be null"); Objects.requireNonNull(runnable, "Runnable must not be null"); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index dbfdf91a1ccb5..68467cd3ac1f6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -35,7 +35,6 @@ import java.util.OptionalInt; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Supplier; /** * The abstract index class which holds entry format agnostic methods. @@ -196,8 +195,8 @@ public void updateParentDir(File parentDir) { * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. */ public boolean resize(int newSize) throws IOException { - return inLockThrows(() -> - inRemapWriteLockThrows(() -> { + return inLock(() -> + inRemapWriteLock(() -> { int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); if (length == roundedNewSize) { @@ -263,7 +262,7 @@ public boolean deleteIfExists() throws IOException { * the file. */ public void trimToValidSize() throws IOException { - inLockThrows(() -> { + inLock(() -> { if (mmap != null) { resize(entrySize() * entries); } @@ -287,7 +286,7 @@ public void closeHandler() { // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - inLockThrows(() -> inRemapWriteLockThrows(this::safeForceUnmap)); + inLock(() -> inRemapWriteLock(this::safeForceUnmap)); } /** @@ -414,36 +413,28 @@ protected void truncateToEntries0(int entries) { mmap.position(entries * entrySize()); } - protected final T inLock(Supplier action) { + protected final T inLock(LockUtils.ThrowingSupplier action) throws E { return LockUtils.inLock(lock, action); } - protected final void inLock(Runnable action) { + protected final void inLock(LockUtils.ThrowingRunnable action) throws E { LockUtils.inLock(lock, action); } - protected final T inLockThrows(LockUtils.ThrowingSupplier action) throws E { - return LockUtils.inLockThrows(lock, action); - } - - protected final void inLockThrows(LockUtils.ThrowingRunnable action) throws E { - LockUtils.inLockThrows(lock, action); - } - - protected final T inRemapReadLock(Supplier action) { + protected final T inRemapReadLock(LockUtils.ThrowingSupplier action) throws E { return LockUtils.inLock(remapLock.readLock(), action); } - protected final void inRemapReadLock(Runnable action) { + protected final void inRemapReadLock(LockUtils.ThrowingRunnable action) throws E { LockUtils.inLock(remapLock.readLock(), action); } - protected final T inRemapWriteLockThrows(LockUtils.ThrowingSupplier action) throws E { - return LockUtils.inLockThrows(remapLock.writeLock(), action); + protected final T inRemapWriteLock(LockUtils.ThrowingSupplier action) throws E { + return LockUtils.inLock(remapLock.writeLock(), action); } - protected final void inRemapWriteLockThrows(LockUtils.ThrowingRunnable action) throws E { - LockUtils.inLockThrows(remapLock.writeLock(), action); + protected final void inRemapWriteLock(LockUtils.ThrowingRunnable action) throws E { + LockUtils.inLock(remapLock.writeLock(), action); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java index e6f50da24eedb..3043c17cf8ac3 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java @@ -215,7 +215,7 @@ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) { @Override public boolean resize(int newSize) throws IOException { - return inLockThrows(() -> { + return inLock(() -> { if (super.resize(newSize)) { this.lastEntry = lastEntryFromIndexFile(); return true; From 9a30206c7a7d727dcf04ea04312bdf63c68f09ce Mon Sep 17 00:00:00 2001 From: Masahiro Mori Date: Sat, 12 Jul 2025 06:10:36 +0900 Subject: [PATCH 4/4] minor: improve comment --- .../apache/kafka/storage/internals/log/AbstractIndex.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index 68467cd3ac1f6..e6da08be59160 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -48,11 +48,9 @@ private enum SearchResultType { private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); // Serializes all index operations that mutate internal state. - // Readers do not need to acquire this lock because: - // 1) They only access published entries and are not affected by concurrent writes. - // 2) MappedByteBuffer provides direct access to the OS-level buffer cache, - // which allows concurrent reads in practice. - // 3) Read and remap operations are coordinated via remapLock to ensure visibility. + // Clients only read committed data and are not affected by concurrent appends/truncates. + // In the rare case, when the data is truncated, the follower could read inconsistent data. + // The follower has the logic to ignore the inconsistent data through crc and leader epoch. private final ReentrantLock lock = new ReentrantLock(); // Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();