Skip to content

MINOR: Refactor LockUtils and improve comments (follow up to KAFKA-19390) #20131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;

/**
* A utility class providing helper methods for working with {@link Lock} objects.
Expand All @@ -35,50 +34,6 @@ public interface ThrowingRunnable<E extends Exception> {
void run() throws E;
}

/**
* Executes the given {@link Supplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <T> the type of the result returned by the supplier
* @param lock the lock to be acquired and released
* @param supplier the supplier to be executed within the lock context
* @return the result of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T> T inLock(Lock lock, Supplier<T> supplier) {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");

lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}

/**
* Executes the given {@link Runnable} within the context of the specified {@link Lock}.
* The lock is acquired before executing the runnable and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param lock the lock to be acquired and released
* @param runnable the runnable to be executed within the lock context
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static void inLock(Lock lock, Runnable runnable) {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");

lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}

/**
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
Expand All @@ -92,7 +47,7 @@ public static void inLock(Lock lock, Runnable runnable) {
* @throws E if an exception occurs during the execution of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");

Expand All @@ -115,7 +70,7 @@ public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplie
* @throws E if an exception occurs during the execution of the runnable
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static <E extends Exception> void inLockThrows(Lock lock, ThrowingRunnable<E> runnable) throws E {
public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.OptionalInt;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

/**
* The abstract index class which holds entry format agnostic methods.
Expand All @@ -48,7 +47,10 @@ private enum SearchResultType {

private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);

// Serializes all index operations that mutate internal state
// Serializes all index operations that mutate internal state.
// Clients only read committed data and are not affected by concurrent appends/truncates.
// In the rare case, when the data is truncated, the follower could read inconsistent data.
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
private final ReentrantLock lock = new ReentrantLock();
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -191,8 +193,8 @@ public void updateParentDir(File parentDir) {
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
return inLockThrows(() ->
inRemapWriteLockThrows(() -> {
return inLock(() ->
inRemapWriteLock(() -> {
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());

if (length == roundedNewSize) {
Expand Down Expand Up @@ -258,7 +260,7 @@ public boolean deleteIfExists() throws IOException {
* the file.
*/
public void trimToValidSize() throws IOException {
inLockThrows(() -> {
inLock(() -> {
if (mmap != null) {
resize(entrySize() * entries);
}
Expand All @@ -282,10 +284,7 @@ public void closeHandler() {
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
inLockThrows(() ->
inRemapWriteLockThrows(() -> {
safeForceUnmap();
}));
inLock(() -> inRemapWriteLock(this::safeForceUnmap));
}

/**
Expand Down Expand Up @@ -412,36 +411,28 @@ protected void truncateToEntries0(int entries) {
mmap.position(entries * entrySize());
}

protected final <T> T inLock(Supplier<T> action) {
protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(lock, action);
}

protected final void inLock(Runnable action) {
protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(lock, action);
}

protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLockThrows(lock, action);
}

protected final <E extends Exception> void inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLockThrows(lock, action);
}

protected final <T> T inRemapReadLock(Supplier<T> action) {
protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.readLock(), action);
}

protected final void inRemapReadLock(Runnable action) {
protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.readLock(), action);
}

protected final <T, E extends Exception> T inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLockThrows(remapLock.writeLock(), action);
protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.writeLock(), action);
}

protected final <E extends Exception> void inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLockThrows(remapLock.writeLock(), action);
protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.writeLock(), action);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {

@Override
public boolean resize(int newSize) throws IOException {
return inLockThrows(() -> {
return inLock(() -> {
if (super.resize(newSize)) {
this.lastEntry = lastEntryFromIndexFile();
return true;
Expand Down