Skip to content

MINOR: Cleanup metadata module #20115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
* A record was encountered where the number of directories does not match the number of replicas.
*/
public class InvalidReplicaDirectoriesException extends InvalidMetadataException {
private static final String ERR_MSG = "The lengths for replicas and directories do not match: ";

private static final long serialVersionUID = 1L;

public InvalidReplicaDirectoriesException(PartitionRecord record) {
super("The lengths for replicas and directories do not match: " + record);
super(ERR_MSG + record);
}

public InvalidReplicaDirectoriesException(PartitionChangeRecord record) {
super("The lengths for replicas and directories do not match: " + record);
super(ERR_MSG + record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* The BrokerheartbeatTracker stores the last time each broker sent a heartbeat to us.
* The BrokerHeartbeatTracker stores the last time each broker sent a heartbeat to us.
* This class will be present only on the active controller.
*
* UNLIKE MOST OF THE KAFKA CONTROLLER, THIS CLASS CAN BE ACCESSED FROM MULTIPLE THREADS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,32 +261,32 @@ static ApiError validateQuotaKeyValue(
}

// Ensure the quota value is valid
switch (configKey.type()) {
case DOUBLE:
return ApiError.NONE;
case SHORT:
return switch (configKey.type()) {
case DOUBLE -> ApiError.NONE;
case SHORT -> {
if (value > Short.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a SHORT.");
}
return getErrorForIntegralQuotaValue(value, key);
case INT:
yield getErrorForIntegralQuotaValue(value, key);
}
case INT -> {
if (value > Integer.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for an INT.");
}
return getErrorForIntegralQuotaValue(value, key);
case LONG: {
yield getErrorForIntegralQuotaValue(value, key);
}
case LONG -> {
if (value > Long.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a LONG.");
}
return getErrorForIntegralQuotaValue(value, key);
yield getErrorForIntegralQuotaValue(value, key);
}
default:
return new ApiError(Errors.UNKNOWN_SERVER_ERROR,
"Unexpected config type " + configKey.type() + " should be Long or Double");
}
default -> new ApiError(Errors.UNKNOWN_SERVER_ERROR,
"Unexpected config type " + configKey.type() + " should be Long or Double");
};
}

static ApiError getErrorForIntegralQuotaValue(double value, String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ void updateDirectories(int brokerId, List<Uuid> dirsToRemove, List<Uuid> dirsToA
}

Iterator<Entry<Integer, Map<String, VersionRange>>> brokerSupportedFeatures() {
return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
return new Iterator<>() {
private final Iterator<BrokerRegistration> iter = brokerRegistrations.values().iterator();

@Override
Expand All @@ -845,7 +845,7 @@ Iterator<Entry<Integer, Map<String, VersionRange>>> controllerSupportedFeatures(
throw new UnsupportedVersionException("The current MetadataVersion is too old to " +
"support controller registrations.");
}
return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
return new Iterator<>() {
private final Iterator<ControllerRegistration> iter = controllerRegistrations.values().iterator();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,7 @@ private ApiError validateAlterConfig(
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
}
if (alterConfigPolicy.isPresent()) {
alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck));
}
alterConfigPolicy.ifPresent(policy -> policy.validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck)));
} catch (ConfigException e) {
return new ApiError(INVALID_CONFIG, e.getMessage());
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ EventPerformanceMonitor build() {
/**
* The period in nanoseconds.
*/
private long periodNs;
private final long periodNs;

/**
* The always-log threshold in nanoseconds.
*/
private long alwaysLogThresholdNs;
private final long alwaysLogThresholdNs;

/**
* The name of the slowest event we've seen so far, or null if none has been seen.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ interface KRaftVersionAccessor {
* @param epoch the current epoch
* @param newVersion the new kraft version to upgrade to
* @param validateOnly whether to just validate the change and not persist it
* @throws ApiException when the upgrade fails to validate
*/
void upgradeKRaftVersion(int epoch, KRaftVersion newVersion, boolean validateOnly);
}
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,6 @@ ClusterControlManager clusterControl() {
return clusterControl;
}

// Visible for testing
FeatureControlManager featureControl() {
return featureControl;
}

// Visible for testing
ConfigurationControlManager configurationControl() {
return configurationControl;
Expand Down Expand Up @@ -1697,7 +1692,7 @@ private void registerGeneratePeriodicPerformanceMessage() {
*
* This task periodically expires delegation tokens.
*
* @param checkIntervalNs
* @param checkIntervalNs The check interval in nanoseconds.
*/
private void registerExpireDelegationTokens(long checkIntervalNs) {
periodicControl.registerTask(new PeriodicTask("expireDelegationTokens",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,6 @@ ControllerResult<AlterPartitionResponseData> alterPartition(
topic,
partitionId,
partition,
context.requestHeader().requestApiVersion(),
partitionData);

if (validationError != Errors.NONE) {
Expand Down Expand Up @@ -1239,7 +1238,6 @@ private Errors validateAlterPartitionData(
TopicControlInfo topic,
int partitionId,
PartitionRegistration partition,
short requestApiVersion,
AlterPartitionRequestData.PartitionData partitionData
) {
if (partition == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,8 @@ public String failureMessage(
bld.append("event unable to start processing because of ");
}
bld.append(internalException.getClass().getSimpleName());
if (externalException.isPresent()) {
bld.append(" (treated as ").
append(externalException.get().getClass().getSimpleName()).append(")");
}
externalException.ifPresent(e -> bld.append(" (treated as ")
.append(e.getClass().getSimpleName()).append(")"));
if (causesFailover()) {
bld.append(" at epoch ").append(epoch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.image.node.ClientQuotaImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -58,8 +57,7 @@ public Map<String, Double> quotaMap() {

public void write(
ClientQuotaEntity entity,
ImageWriter writer,
ImageWriterOptions options
ImageWriter writer
) {
for (Entry<String, Double> entry : quotas.entrySet()) {
writer.write(0, new ClientQuotaRecord().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.image.node.ClientQuotasImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -67,11 +66,11 @@ public Map<ClientQuotaEntity, ClientQuotaImage> entities() {
return entities;
}

public void write(ImageWriter writer, ImageWriterOptions options) {
public void write(ImageWriter writer) {
for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
ClientQuotaEntity entity = entry.getKey();
ClientQuotaImage clientQuotaImage = entry.getValue();
clientQuotaImage.write(entity, writer, options);
clientQuotaImage.write(entity, writer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,7 @@ public ClusterImage apply() {
int nodeId = entry.getKey();
Optional<BrokerRegistration> brokerRegistration = entry.getValue();
if (!newBrokers.containsKey(nodeId)) {
if (brokerRegistration.isPresent()) {
newBrokers.put(nodeId, brokerRegistration.get());
}
brokerRegistration.ifPresent(registration -> newBrokers.put(nodeId, registration));
}
}
Map<Integer, ControllerRegistration> newControllers = new HashMap<>(image.controllers().size());
Expand All @@ -184,9 +182,7 @@ public ClusterImage apply() {
int nodeId = entry.getKey();
Optional<ControllerRegistration> controllerRegistration = entry.getValue();
if (!newControllers.containsKey(nodeId)) {
if (controllerRegistration.isPresent()) {
newControllers.put(nodeId, controllerRegistration.get());
}
controllerRegistration.ifPresent(registration -> newControllers.put(nodeId, registration));
}
}
return new ClusterImage(newBrokers, newControllers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.image.node.ConfigurationImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -71,8 +70,7 @@ public Map<String, String> toMap() {

public void write(
ConfigResource configResource,
ImageWriter writer,
ImageWriterOptions options
ImageWriter writer
) {
for (Map.Entry<String, String> entry : data.entrySet()) {
writer.write(0, new ConfigRecord().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.image.node.ConfigurationsImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -74,11 +73,11 @@ public Map<String, String> configMapForResource(ConfigResource configResource) {
}
}

public void write(ImageWriter writer, ImageWriterOptions options) {
public void write(ImageWriter writer) {
for (Entry<ConfigResource, ConfigurationImage> entry : data.entrySet()) {
ConfigResource configResource = entry.getKey();
ConfigurationImage configImage = entry.getValue();
configImage.write(configResource, writer, options);
configImage.write(configResource, writer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
features.write(writer, options);
cluster.write(writer, options);
topics.write(writer, options);
configs.write(writer, options);
clientQuotas.write(writer, options);
producerIds.write(writer, options);
configs.write(writer);
clientQuotas.write(writer);
producerIds.write(writer);
acls.write(writer);
scram.write(writer, options);
delegationTokens.write(writer, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.image.node.ProducerIdsImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.Objects;

Expand All @@ -46,7 +45,7 @@ public long nextProducerId() {
return nextProducerId;
}

public void write(ImageWriter writer, ImageWriterOptions options) {
public void write(ImageWriter writer) {
if (nextProducerId >= 0) {
writer.write(0, new ProducerIdsRecord().
setBrokerId(-1).
Expand Down
6 changes: 3 additions & 3 deletions metadata/src/main/java/org/apache/kafka/image/ScramImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,17 @@ public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCreden
DescribeUserScramCredentialsResult result = new DescribeUserScramCredentialsResult().setUser(user.getKey());

if (!user.getValue()) {
boolean datafound = false;
boolean dataFound = false;
List<CredentialInfo> credentialInfos = new ArrayList<>();
for (Map.Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismsEntry : mechanisms.entrySet()) {
Map<String, ScramCredentialData> credentialDataSet = mechanismsEntry.getValue();
if (credentialDataSet.containsKey(user.getKey())) {
credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
datafound = true;
dataFound = true;
}
}
if (datafound) {
if (dataFound) {
result.setCredentialInfos(credentialInfos);
} else {
result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
Expand Down
11 changes: 4 additions & 7 deletions metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.metadata.PartitionRegistration;
Expand All @@ -40,8 +39,8 @@
public final class TopicDelta {
private final TopicImage image;
private final Map<Integer, PartitionRegistration> partitionChanges = new HashMap<>();
private Map<Integer, Integer> partitionToUncleanLeaderElectionCount = new HashMap<>();
private Map<Integer, Integer> partitionToElrElectionCount = new HashMap<>();
private final Map<Integer, Integer> partitionToUncleanLeaderElectionCount = new HashMap<>();
private final Map<Integer, Integer> partitionToElrElectionCount = new HashMap<>();

public TopicDelta(TopicImage image) {
this.image = image;
Expand Down Expand Up @@ -113,11 +112,9 @@ private void updateElectionStats(int partitionId, PartitionRegistration prevPart
}
}

public void replay(ClearElrRecord record) {
public void replay() {
// Some partitions are not added to the image yet, let's check the partitionChanges first.
partitionChanges.forEach((partitionId, partition) -> {
maybeClearElr(partitionId, partition);
});
partitionChanges.forEach(this::maybeClearElr);

image.partitions().forEach((partitionId, partition) -> {
if (!partitionChanges.containsKey(partitionId)) {
Expand Down
14 changes: 5 additions & 9 deletions metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public void replay(PartitionChangeRecord record) {
topicDelta.replay(record);
}

private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord record) {
private void maybeReplayClearElrRecord(Uuid topicId) {
// Only apply the record if the topic is not deleted.
if (!deletedTopicIds.contains(topicId)) {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);
topicDelta.replay();
}
}

Expand All @@ -123,15 +123,11 @@ public void replay(ClearElrRecord record) {
record.topicName() + ": no such topic found.");
}

maybeReplayClearElrRecord(topicId, record);
maybeReplayClearElrRecord(topicId);
} else {
// Update all the existing topics
image.topicsById().forEach((topicId, image) -> {
maybeReplayClearElrRecord(topicId, record);
});
createdTopicIds().forEach((topicId -> {
maybeReplayClearElrRecord(topicId, record);
}));
image.topicsById().forEach((topicId, image) -> maybeReplayClearElrRecord(topicId));
createdTopicIds().forEach((this::maybeReplayClearElrRecord));
}
}

Expand Down
Loading