Skip to content

KAFKA-19254: Add generic feature level metrics #20021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: trunk
Choose a base branch
from

Conversation

kevin-wu24
Copy link
Contributor

@kevin-wu24 kevin-wu24 commented Jun 23, 2025

This PR adds the following metrics for each of the supported production
features (metadata.version, kraft.version, transaction.version,
etc.):

kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=X
kafka.server:type=node-metrics,name=maximum-supported-level,feature-name=X
kafka.server:type=node-metrics,name=minimum-supported-level,feature-name=X

Reviewers: PoAn Yang [email protected], Jhen-Yung Hsu
[email protected], TengYao Chi [email protected], Ken Huang
[email protected], Lan Ding [email protected], Chia-Ping Tsai
[email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker kraft labels Jun 23, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 thanks for this patch. a couple of comments are left. PTAL


private MetricName getFeatureNameTagMetricName(String name, String group, String featureName) {
LinkedHashMap<String, String> featureNameTag = new LinkedHashMap<>();
featureNameTag.put(FEATURE_NAME_TAG, featureName.replace(".", "-"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it contains only one element, so it should be fine to use Map.of


private static MetricName getFeatureNameTagMetricName(String type, String name, String featureName) {
LinkedHashMap<String, String> featureNameTag = new LinkedHashMap<>();
featureNameTag.put(FEATURE_NAME_TAG, sanitizeFeatureName(featureName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KafkaYammerMetrics.getMetricName method below expects a LinkedHashMap parameter, not Map.

finalizedFeatureLevels.put(featureName, featureLevel);
}

public short finalizedFeatureLevel(String featureName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be private method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept this public and added checks in the test mentioned here: #20021 (comment)

return KafkaYammerMetrics.getMetricName("kafka.server", type, name, featureNameTag);
}

private static String sanitizeFeatureName(String featureName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider adding comments to remind readers that the naming style is different to NodeMetrics, and this is expected.

@github-actions github-actions bot removed the triage PRs from the community label Jun 24, 2025
Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24: Thanks for the patch!

Comment on lines 194 to +202
private static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name);
return new MetricName("kafka.server", type, name, null, mBeanName);
}

private static MetricName metricName(String type, String name, String scope) {
String mBeanName = String.format("kafka.server:type=%s,name=%s,%s", type, name, scope);
return new MetricName("kafka.server", type, name, scope, mBeanName);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we combine these two similar methods?
For example:

    private static MetricName metricName(String type, String... nameParts) {
        if (nameParts.length == 0) {
            throw new IllegalArgumentException("At least one name part is required");
        }

        String name = nameParts[0];
        String scope = nameParts.length > 1 ? String.join(",", Arrays.copyOfRange(nameParts, 1, nameParts.length)) : null;

        String mBeanName = scope == null
            ? String.format("kafka.server:type=%s,name=%s", type, name)
            : String.format("kafka.server:type=%s,name=%s,%s", type, name, scope);

        return new MetricName("kafka.server", type, name, scope, mBeanName);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to leave the methods as is. I think combining them both into one variadic method is a bit harder to read.

Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, left a comment.

Comment on lines 223 to 224
builder.append(word.substring(0, 1).toUpperCase(Locale.ROOT))
.append(word.substring(1).toLowerCase(Locale.ROOT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use charAt(0) to directly get the first character? This avoids creating temporary strings by substring.

Suggested change
builder.append(word.substring(0, 1).toUpperCase(Locale.ROOT))
.append(word.substring(1).toLowerCase(Locale.ROOT));
builder.append(Character.toUpperCase(word.charAt(0)))
.append(word.substring(1).toLowerCase(Locale.ROOT));

Copy link
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Please run ./gradlew checkstyleMain checkstyleTest spotlessCheck to fix lint issue.


);

try (NodeMetrics ignored = new NodeMetrics(metrics, true)) {
Copy link
Member

@FrankYang0529 FrankYang0529 Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a case to test new NodeMetrics(metrics, false) as well?

@@ -42,10 +47,13 @@ public final class MetadataLoaderMetrics implements AutoCloseable {
"MetadataLoader", "HandleLoadSnapshotCount");
public static final MetricName CURRENT_CONTROLLER_ID = getMetricName(
"MetadataLoader", "CurrentControllerId");
public static final String FINALIZED_LEVEL_METRIC_NAME = "FinalizedLevel";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable can be private. There is no usage outside this class.

Comment on lines 349 to 360
MetadataVersion metadataVersion = image.features().metadataVersionOrThrow();
metrics.setCurrentMetadataVersion(metadataVersion);
metrics.setFinalizedFeatureLevel(
MetadataVersion.FEATURE_NAME,
metadataVersion.featureLevel()
);
for (var finalizedFeatureEntry : image.features().finalizedVersions().entrySet()) {
metrics.setFinalizedFeatureLevel(
finalizedFeatureEntry.getKey(),
finalizedFeatureEntry.getValue()
);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add related test to MetadataLoaderTest as well? For example, there is assertion about currentMetadataVersion.

assertEquals(MINIMUM_VERSION,
loader.metrics().currentMetadataVersion());

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for checks of current metadata version metric value, I will add a check for the finalized feature level metric too.

Comment on lines 42 to 53
for (var featureName : Feature.PRODUCTION_FEATURE_NAMES) {
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName);
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName);
}
addSupportedLevelMetric(
MAXIMUM_SUPPORTED_LEVEL_NAME,
MetadataVersion.FEATURE_NAME
);
addSupportedLevelMetric(
MINIMUM_SUPPORTED_LEVEL_NAME,
MetadataVersion.FEATURE_NAME
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use supportedFeatureRanges instead of setting metrics separately? I noticed that the close method uses supportedFeatureRanges.keySet() to remove metrics.

Suggested change
for (var featureName : Feature.PRODUCTION_FEATURE_NAMES) {
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName);
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName);
}
addSupportedLevelMetric(
MAXIMUM_SUPPORTED_LEVEL_NAME,
MetadataVersion.FEATURE_NAME
);
addSupportedLevelMetric(
MINIMUM_SUPPORTED_LEVEL_NAME,
MetadataVersion.FEATURE_NAME
);
supportedFeatureRanges.forEach((featureName, versionRange) -> {
addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName);
addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName);
});

Metrics metrics = new Metrics();
String expectedGroup = "node-metrics";

// Metric description is not use for metric name equality
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typo is the same as in BrokerServerMetricsTest.java (looks like you followed it).

Suggested change
// Metric description is not use for metric name equality
// Metric description is not used for metric name equality

Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@kevin-wu24
Copy link
Contributor Author

Thanks all for the review. Pushed commits to address the comments.
@chia7712 can you take another look and let me know if there is anything else?

METRIC_GROUP_NAME,
featureName
),
(config, now) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will use the type Measurable and the value type is double rather than integer, right?

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember bringing this issue up when working on the KafkaRaftMetrics a while back: #18304 (comment). Yes, it will upcast to a double.

I guess the actual values we want are short right? Since feature levels themselves are shorts. I'll update the KIP with that so it's consistent.

});
}

private void addSupportedLevelMetric(String metricName, String featureName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supportedFeatureRanges is immutable, so perhaps we can pass the value to this method?

        supportedFeatureRanges.forEach((featureName, versionRange) -> {
            addSupportedLevelMetric(MAXIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.max());
            addSupportedLevelMetric(MINIMUM_SUPPORTED_LEVEL_NAME, featureName, versionRange.min());
        });
    private void addSupportedLevelMetric(String metricName, String featureName, short value) {
        metrics.addMetric(
            getFeatureNameTagMetricName(
                metricName,
                METRIC_GROUP_NAME,
                featureName
            ),
            (Gauge<Short>) (config, now) -> value
        );
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 any feedback for above comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm good with this change. Will update the PR.

@jsancio
Copy link
Member

jsancio commented Jun 26, 2025

Hi @kevin-wu24, @chia7712 and all,

I discovered a subtle issue with the existing metadata version metrics that I think we should fix in your new metrics. I left a comment in discussion thread if you want to move the discussion there.

https://lists.apache.org/thread/tjrzqb2hmmymshln8r816m9l3d79f605

@chia7712
Copy link
Member

I discovered a subtle issue with the existing metadata version metrics that I think we should fix in your new metrics. I left a comment in discussion thread if you want to move the discussion there.

that makes sense to me. @kevin-wu24 WDYT?

@kevin-wu24
Copy link
Contributor Author

I discovered a subtle issue with the existing metadata version metrics that I think we should fix in your new metrics. I left a comment in discussion thread if you want to move the discussion there.

that makes sense to me. @kevin-wu24 WDYT?

Yeah, I agree with the approach as well. I'll push a commit to get the semantics we want.

* @param featureLevel The finalized level for the feature
*/
public void recordFinalizedFeatureLevel(String featureName, short featureLevel) {
if (finalizedFeatureLevels.putIfAbsent(featureName, featureLevel) == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, why to use putIfAbsent? I assume the code could be simplified.

        var metricCreated = finalizedFeatureLevels.put(featureName, featureLevel) != null;
        if (!metricCreated) addFinalizedFeatureLevelMetric(featureName);

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the double ! is a bit confusing, but yeah let me fix this to use put instead of putIfAbsent.

How about:

final var metricNotRegistered = finalizedFeatureLevels.put(featureName, featureLevel) == null;
if (metricNotRegistered) addFinalizedFeatureLevelMetric(featureName);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! 😃

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the implementation @kevin-wu24 . Partial review.

* @param featureLevel The finalized level for the feature
*/
public void recordFinalizedFeatureLevel(String featureName, short featureLevel) {
final var metricNotRegistered = finalizedFeatureLevels.putIfAbsent(featureName, featureLevel) == null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should always override the value with the latest value, no? What you only want to do once is register the metric since that should only be done the first time the value gets set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should always override the value with the latest value, no?

Yes, the value in the map (and thus the value of the metric) is always updated with the latest value. I was using putIfAbsent before to check if it is the first time the value is getting set to know if we should register the metric, but changed it to put as per @chia7712's comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a bit confused. That should use put instead of putIfAbsent, shouldn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 please take a look at the latest code. The latest code uses putIfAbsent and not put to update the concurrent map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it does say that. Let me fix it.

);

// Set all production feature levels from the image, defaulting to their minimum production values
for (var feature : Feature.PRODUCTION_FEATURES) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Is the argument here that broker and controller registration would fail if there is a finalized feature version that this node doesn't know about?

Meaning it is not possible for the cluster metadata partition to have a finalized feature version that this node doesn't know about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the argument here that broker and controller registration would fail if there is a finalized feature version that this node doesn't know about?

Yes, the registration fails if there is a finalized feature version the registering node does not support. ClusterControlManager#processRegistrationFeature has this code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should ensure the exposed metrics are always based on current image. The benefit is the exposed finalized versions will be consistent to current image, and we won't need to use minimumProduction which puts us in a weird position

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It would be nice to not make this assumption. The less assumptions you make the more resilient is the code to future changes. How about iterating through all of the features in the delta and only updating those values?

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Doesn't this mean we won't have metrics for features that do not have a finalized level? From the discussion thread, it said:

Any missing finalized feature version can be configured to its minimum
value.

which is why I made the change to iterate over all features instead of the ones in the image.

EDIT: If the idea is to add a separate maybeRegisterMissingFeatures method, I don't see how that ends up any different than this code, since we'll have to loop over the PRODUCTION_FEATURES in that method instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should ensure the exposed metrics are always based on current image. The benefit is the exposed finalized versions will be consistent to current image, and we won't need to use minimumProduction which puts us in a weird position

IIUC, we should only expose the finalizedLevel metrics for features that actually have a finalized level? If the feature does not have a finalized level, it does not have a finalizedLevel metric? I'm okay with this approach, but I think we'll just need to update the KIP since the specific language is:

The FinalizedLevel  metric will report the finalized feature level for each production feature. If the feature level is not set, the metric will return a value of 0, since that means the feature is not enabled. 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the feature does not have a finalized level, it does not have a finalizedLevel metric?

Yes, that is what I meant

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we should only expose the finalizedLevel metrics for features that actually have a finalized level? If the feature does not have a finalized level, it does not have a finalizedLevel metric?

Yes. Let's implement this definition. Sorry for the confusion and back and forth. Please feel free to update the KIP. This also means that if the finalized feature version is "removed" (set to 0), the code needs to remove the associated metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also means that if the finalized feature version is "removed" (set to 0), the code needs to remove the associated metric.

I guess this applies to all features besides metadata.version (whose minimum level is 7), and kraft.version (whose minimum is 0, but 0 does not mean that KRaft is "disabled" like other features). Since these two features are never part of the features image, I think their associated metrics should never be removed. Other features are removed from the image when their level is set to 0, so I think it makes sense to remove their metrics too. I'm going to update the KIP to document the exceptions for metadata + kraft version.

for (var feature : Feature.PRODUCTION_FEATURES) {
// Set all production feature levels from the image
for (var featureEntry : image.features().finalizedVersions().entrySet()) {
metrics.maybeRemoveFinalizedFeatureLevelMetrics(image.features().finalizedVersions());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be outside the loop, right?

@kevin-wu24
Copy link
Contributor Author

A quick note about kraft.version:
Since d04efca omits the feature level record for kraft.version from the features image, we need to update that feature's metric in a special way (i.e. in handleCommit + handleLoadSnapshot).

I will also add unit tests to cover these cases.

@kevin-wu24
Copy link
Contributor Author

@chia7712 Are you able to take another look? If there is anything else let me know. Thanks for the reviews.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 thanks for this patch. overall LGTM

* @param newFinalizedLevels The new finalized feature levels from the features image
*/
public void maybeRemoveFinalizedFeatureLevelMetrics(Map<String, Short> newFinalizedLevels) {
finalizedFeatureLevels.keySet().stream().filter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could leverage iterator to avoid iterating through all items twice.

        var iter = finalizedFeatureLevels.keySet().iterator();
        while (iter.hasNext()) {
            var featureName = iter.next();
            if (newFinalizedLevels.containsKey(featureName) ||
                featureName.equals(MetadataVersion.FEATURE_NAME) ||
                featureName.equals(KRaftVersion.FEATURE_NAME)) {
                continue;
            }
            removeFinalizedFeatureLevelMetric(featureName);
            iter.remove();
        }

Copy link
Collaborator

@Yunyung Yunyung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Overall, LGTM. Two comments left

*/
@Test
public void testKRaftVersionFinalizedLevelMetric() throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptyBatch");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptyBatch");
MockFaultHandler faultHandler = new MockFaultHandler("testKRaftVersionFinalizedLevelMetric");

* @throws Exception
*/
@Test
public void testKRaftVersionFinalizedLevelMetric() throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to also cover handleLoadSnapshot (before calling handleCommit) and test whether KRaftVersion is set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants