-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19254: Add generic feature level metrics #20021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-wu24 thanks for this patch. a couple of comments are left. PTAL
|
||
private MetricName getFeatureNameTagMetricName(String name, String group, String featureName) { | ||
LinkedHashMap<String, String> featureNameTag = new LinkedHashMap<>(); | ||
featureNameTag.put(FEATURE_NAME_TAG, featureName.replace(".", "-")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it contains only one element, so it should be fine to use Map.of
|
||
private static MetricName getFeatureNameTagMetricName(String type, String name, String featureName) { | ||
LinkedHashMap<String, String> featureNameTag = new LinkedHashMap<>(); | ||
featureNameTag.put(FEATURE_NAME_TAG, sanitizeFeatureName(featureName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KafkaYammerMetrics.getMetricName method below expects a LinkedHashMap parameter, not Map.
finalizedFeatureLevels.put(featureName, featureLevel); | ||
} | ||
|
||
public short finalizedFeatureLevel(String featureName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be private method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kept this public and added checks in the test mentioned here: #20021 (comment)
return KafkaYammerMetrics.getMetricName("kafka.server", type, name, featureNameTag); | ||
} | ||
|
||
private static String sanitizeFeatureName(String featureName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider adding comments to remind readers that the naming style is different to NodeMetrics
, and this is expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-wu24: Thanks for the patch!
private static MetricName metricName(String type, String name) { | ||
String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name); | ||
return new MetricName("kafka.server", type, name, null, mBeanName); | ||
} | ||
|
||
private static MetricName metricName(String type, String name, String scope) { | ||
String mBeanName = String.format("kafka.server:type=%s,name=%s,%s", type, name, scope); | ||
return new MetricName("kafka.server", type, name, scope, mBeanName); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we combine these two similar methods?
For example:
private static MetricName metricName(String type, String... nameParts) {
if (nameParts.length == 0) {
throw new IllegalArgumentException("At least one name part is required");
}
String name = nameParts[0];
String scope = nameParts.length > 1 ? String.join(",", Arrays.copyOfRange(nameParts, 1, nameParts.length)) : null;
String mBeanName = scope == null
? String.format("kafka.server:type=%s,name=%s", type, name)
: String.format("kafka.server:type=%s,name=%s,%s", type, name, scope);
return new MetricName("kafka.server", type, name, scope, mBeanName);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to leave the methods as is. I think combining them both into one variadic method is a bit harder to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, left a comment.
builder.append(word.substring(0, 1).toUpperCase(Locale.ROOT)) | ||
.append(word.substring(1).toLowerCase(Locale.ROOT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use charAt(0)
to directly get the first character? This avoids creating temporary strings by substring
.
builder.append(word.substring(0, 1).toUpperCase(Locale.ROOT)) | |
.append(word.substring(1).toLowerCase(Locale.ROOT)); | |
builder.append(Character.toUpperCase(word.charAt(0))) | |
.append(word.substring(1).toLowerCase(Locale.ROOT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Please run ./gradlew checkstyleMain checkstyleTest spotlessCheck
to fix lint issue.
|
||
); | ||
|
||
try (NodeMetrics ignored = new NodeMetrics(metrics, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we create a case to test new NodeMetrics(metrics, false)
as well?
@@ -42,10 +47,13 @@ public final class MetadataLoaderMetrics implements AutoCloseable { | |||
"MetadataLoader", "HandleLoadSnapshotCount"); | |||
public static final MetricName CURRENT_CONTROLLER_ID = getMetricName( | |||
"MetadataLoader", "CurrentControllerId"); | |||
public static final String FINALIZED_LEVEL_METRIC_NAME = "FinalizedLevel"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable can be private. There is no usage outside this class.
MetadataVersion metadataVersion = image.features().metadataVersionOrThrow(); | ||
metrics.setCurrentMetadataVersion(metadataVersion); | ||
metrics.setFinalizedFeatureLevel( | ||
MetadataVersion.FEATURE_NAME, | ||
metadataVersion.featureLevel() | ||
); | ||
for (var finalizedFeatureEntry : image.features().finalizedVersions().entrySet()) { | ||
metrics.setFinalizedFeatureLevel( | ||
finalizedFeatureEntry.getKey(), | ||
finalizedFeatureEntry.getValue() | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add related test to MetadataLoaderTest as well? For example, there is assertion about currentMetadataVersion
.
kafka/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
Lines 356 to 357 in 1ca8779
assertEquals(MINIMUM_VERSION, | |
loader.metrics().currentMetadataVersion()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, for checks of current metadata version metric value, I will add a check for the finalized feature level metric too.
for (var featureName : Feature.PRODUCTION_FEATURE_NAMES) { | ||
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName); | ||
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName); | ||
} | ||
addSupportedLevelMetric( | ||
MAXIMUM_SUPPORTED_LEVEL_NAME, | ||
MetadataVersion.FEATURE_NAME | ||
); | ||
addSupportedLevelMetric( | ||
MINIMUM_SUPPORTED_LEVEL_NAME, | ||
MetadataVersion.FEATURE_NAME | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use supportedFeatureRanges
instead of setting metrics separately? I noticed that the close method uses supportedFeatureRanges.keySet()
to remove metrics.
for (var featureName : Feature.PRODUCTION_FEATURE_NAMES) { | |
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName); | |
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName); | |
} | |
addSupportedLevelMetric( | |
MAXIMUM_SUPPORTED_LEVEL_NAME, | |
MetadataVersion.FEATURE_NAME | |
); | |
addSupportedLevelMetric( | |
MINIMUM_SUPPORTED_LEVEL_NAME, | |
MetadataVersion.FEATURE_NAME | |
); | |
supportedFeatureRanges.forEach((featureName, versionRange) -> { | |
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName); | |
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName); | |
}); |
Metrics metrics = new Metrics(); | ||
String expectedGroup = "node-metrics"; | ||
|
||
// Metric description is not use for metric name equality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The typo is the same as in BrokerServerMetricsTest.java (looks like you followed it).
// Metric description is not use for metric name equality | |
// Metric description is not used for metric name equality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks all for the review. Pushed commits to address the comments. |
METRIC_GROUP_NAME, | ||
featureName | ||
), | ||
(config, now) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will use the type Measurable
and the value type is double
rather than integer
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember bringing this issue up when working on the KafkaRaftMetrics
a while back: #18304 (comment). Yes, it will upcast to a double.
I guess the actual values we want are short
right? Since feature levels themselves are shorts
. I'll update the KIP with that so it's consistent.
}); | ||
} | ||
|
||
private void addSupportedLevelMetric(String metricName, String featureName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supportedFeatureRanges
is immutable, so perhaps we can pass the value to this method?
supportedFeatureRanges.forEach((featureName, versionRange) -> {
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.max());
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.min());
});
private void addSupportedLevelMetric(String metricName, String featureName, short value) {
metrics.addMetric(
getFeatureNameTagMetricName(
metricName,
METRIC_GROUP_NAME,
featureName
),
(Gauge<Short>) (config, now) -> value
);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-wu24 any feedback for above comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm good with this change. Will update the PR.
Hi @kevin-wu24, @chia7712 and all, I discovered a subtle issue with the existing metadata version metrics that I think we should fix in your new metrics. I left a comment in discussion thread if you want to move the discussion there. https://lists.apache.org/thread/tjrzqb2hmmymshln8r816m9l3d79f605 |
that makes sense to me. @kevin-wu24 WDYT? |
Yeah, I agree with the approach as well. I'll push a commit to get the semantics we want. |
* @param featureLevel The finalized level for the feature | ||
*/ | ||
public void recordFinalizedFeatureLevel(String featureName, short featureLevel) { | ||
if (finalizedFeatureLevels.putIfAbsent(featureName, featureLevel) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, why to use putIfAbsent
? I assume the code could be simplified.
var metricCreated = finalizedFeatureLevels.put(featureName, featureLevel) != null;
if (!metricCreated) addFinalizedFeatureLevelMetric(featureName);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the double !
is a bit confusing, but yeah let me fix this to use put
instead of putIfAbsent
.
How about:
final var metricNotRegistered = finalizedFeatureLevels.put(featureName, featureLevel) == null;
if (metricNotRegistered) addFinalizedFeatureLevelMetric(featureName);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great! 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the implementation @kevin-wu24 . Partial review.
* @param featureLevel The finalized level for the feature | ||
*/ | ||
public void recordFinalizedFeatureLevel(String featureName, short featureLevel) { | ||
final var metricNotRegistered = finalizedFeatureLevels.putIfAbsent(featureName, featureLevel) == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code should always override the value with the latest value, no? What you only want to do once is register the metric since that should only be done the first time the value gets set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code should always override the value with the latest value, no?
Yes, the value in the map (and thus the value of the metric) is always updated with the latest value. I was using putIfAbsent
before to check if it is the first time the value is getting set to know if we should register the metric, but changed it to put
as per @chia7712's comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get a bit confused. That should use put
instead of putIfAbsent
, shouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-wu24 please take a look at the latest code. The latest code uses putIfAbsent
and not put
to update the concurrent map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, it does say that. Let me fix it.
); | ||
|
||
// Set all production feature levels from the image, defaulting to their minimum production values | ||
for (var feature : Feature.PRODUCTION_FEATURES) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Is the argument here that broker and controller registration would fail if there is a finalized feature version that this node doesn't know about?
Meaning it is not possible for the cluster metadata partition to have a finalized feature version that this node doesn't know about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the argument here that broker and controller registration would fail if there is a finalized feature version that this node doesn't know about?
Yes, the registration fails if there is a finalized feature version the registering node does not support. ClusterControlManager#processRegistrationFeature
has this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should ensure the exposed metrics are always based on current image. The benefit is the exposed finalized versions will be consistent to current image, and we won't need to use minimumProduction
which puts us in a weird position
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. It would be nice to not make this assumption. The less assumptions you make the more resilient is the code to future changes. How about iterating through all of the features in the delta and only updating those values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. Doesn't this mean we won't have metrics for features that do not have a finalized level? From the discussion thread, it said:
Any missing finalized feature version can be configured to its minimum
value.
which is why I made the change to iterate over all features instead of the ones in the image.
EDIT: If the idea is to add a separate maybeRegisterMissingFeatures
method, I don't see how that ends up any different than this code, since we'll have to loop over the PRODUCTION_FEATURES
in that method instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should ensure the exposed metrics are always based on current image. The benefit is the exposed finalized versions will be consistent to current image, and we won't need to use minimumProduction which puts us in a weird position
IIUC, we should only expose the finalizedLevel metrics for features that actually have a finalized level? If the feature does not have a finalized level, it does not have a finalizedLevel metric? I'm okay with this approach, but I think we'll just need to update the KIP since the specific language is:
The FinalizedLevel metric will report the finalized feature level for each production feature. If the feature level is not set, the metric will return a value of 0, since that means the feature is not enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the feature does not have a finalized level, it does not have a finalizedLevel metric?
Yes, that is what I meant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we should only expose the finalizedLevel metrics for features that actually have a finalized level? If the feature does not have a finalized level, it does not have a finalizedLevel metric?
Yes. Let's implement this definition. Sorry for the confusion and back and forth. Please feel free to update the KIP. This also means that if the finalized feature version is "removed" (set to 0), the code needs to remove the associated metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also means that if the finalized feature version is "removed" (set to 0), the code needs to remove the associated metric.
I guess this applies to all features besides metadata.version (whose minimum level is 7), and kraft.version (whose minimum is 0, but 0 does not mean that KRaft is "disabled" like other features). Since these two features are never part of the features image, I think their associated metrics should never be removed. Other features are removed from the image when their level is set to 0, so I think it makes sense to remove their metrics too. I'm going to update the KIP to document the exceptions for metadata + kraft version.
for (var feature : Feature.PRODUCTION_FEATURES) { | ||
// Set all production feature levels from the image | ||
for (var featureEntry : image.features().finalizedVersions().entrySet()) { | ||
metrics.maybeRemoveFinalizedFeatureLevelMetrics(image.features().finalizedVersions()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should be outside the loop, right?
A quick note about I will also add unit tests to cover these cases. |
@chia7712 Are you able to take another look? If there is anything else let me know. Thanks for the reviews. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-wu24 thanks for this patch. overall LGTM
* @param newFinalizedLevels The new finalized feature levels from the features image | ||
*/ | ||
public void maybeRemoveFinalizedFeatureLevelMetrics(Map<String, Short> newFinalizedLevels) { | ||
finalizedFeatureLevels.keySet().stream().filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could leverage iterator
to avoid iterating through all items twice.
var iter = finalizedFeatureLevels.keySet().iterator();
while (iter.hasNext()) {
var featureName = iter.next();
if (newFinalizedLevels.containsKey(featureName) ||
featureName.equals(MetadataVersion.FEATURE_NAME) ||
featureName.equals(KRaftVersion.FEATURE_NAME)) {
continue;
}
removeFinalizedFeatureLevelMetric(featureName);
iter.remove();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Overall, LGTM. Two comments left
*/ | ||
@Test | ||
public void testKRaftVersionFinalizedLevelMetric() throws Exception { | ||
MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptyBatch"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptyBatch"); | |
MockFaultHandler faultHandler = new MockFaultHandler("testKRaftVersionFinalizedLevelMetric"); |
* @throws Exception | ||
*/ | ||
@Test | ||
public void testKRaftVersionFinalizedLevelMetric() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to also cover handleLoadSnapshot
(before calling handleCommit
) and test whether KRaftVersion is set.
This PR adds the following metrics for each of the supported production
features (
metadata.version
,kraft.version
,transaction.version
,etc.):
kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=X
kafka.server:type=node-metrics,name=maximum-supported-level,feature-name=X
kafka.server:type=node-metrics,name=minimum-supported-level,feature-name=X
Reviewers: PoAn Yang [email protected], Jhen-Yung Hsu
[email protected], TengYao Chi [email protected], Ken Huang
[email protected], Lan Ding [email protected], Chia-Ping Tsai
[email protected]