Skip to content

Add jspecify based Nullability checks in spring-kafka-test module #3737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2024 the original author or authors.
* Copyright 2023-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
Expand All @@ -54,6 +55,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.jspecify.annotations.Nullable;

import org.springframework.core.log.LogAccessor;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -93,7 +95,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
private static final boolean IS_KAFKA_39_OR_LATER = ClassUtils.isPresent(
"org.apache.kafka.server.config.AbstractKafkaConfig", EmbeddedKafkaKraftBroker.class.getClassLoader());

private static final Method SET_CONFIG_METHOD;
private static final @Nullable Method SET_CONFIG_METHOD;

static {
if (IS_KAFKA_39_OR_LATER) {
Expand All @@ -117,7 +119,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {

private final AtomicBoolean initialized = new AtomicBoolean();

private KafkaClusterTestKit cluster;
private @Nullable KafkaClusterTestKit cluster;

private int[] kafkaPorts;

Expand All @@ -131,7 +133,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics) {
public EmbeddedKafkaKraftBroker(int count, int partitions, String @Nullable ... topics) {
this.count = count;
this.kafkaPorts = new int[this.count]; // random ports by default.
if (topics != null) {
Expand Down Expand Up @@ -261,7 +263,9 @@ private void start() {
private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, String key, Object value) {
if (IS_KAFKA_39_OR_LATER) {
// For Kafka 3.9.0+: use reflection
ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value);
if (SET_CONFIG_METHOD != null) {
ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value);
}
}
else {
// For Kafka 3.8.0: direct call
Expand Down Expand Up @@ -484,10 +488,12 @@ public int getPartitionsPerTopic() {

@Override
public String getBrokersAsString() {
return (String) this.cluster.clientProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
Assert.notNull(this.cluster, "cluster cannot be null");
String brokersString = (String) this.cluster.clientProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
return Objects.requireNonNull(brokersString);
}

public KafkaClusterTestKit getCluster() {
public @Nullable KafkaClusterTestKit getCluster() {
return this.cluster;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,7 @@
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.jspecify.annotations.Nullable;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.test.core.BrokerAddress;
Expand Down Expand Up @@ -117,9 +118,9 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {

private final AtomicBoolean initialized = new AtomicBoolean();

private EmbeddedZookeeper zookeeper;
private @Nullable EmbeddedZookeeper zookeeper;

private String zkConnect;
private @Nullable String zkConnect;

private int zkPort;

Expand All @@ -133,7 +134,7 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {

private String brokerListProperty = "spring.kafka.bootstrap-servers";

private volatile ZooKeeperClient zooKeeperClient;
private volatile @Nullable ZooKeeperClient zooKeeperClient;

public EmbeddedKafkaZKBroker(int count) {
this(count, false);
Expand All @@ -145,7 +146,7 @@ public EmbeddedKafkaZKBroker(int count) {
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics) {
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String @Nullable ... topics) {
this(count, controlledShutdown, 2, topics);
}

Expand All @@ -156,7 +157,7 @@ public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... to
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics) {
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String @Nullable ... topics) {
this.count = count;
this.kafkaPorts = new int[this.count]; // random ports by default.
this.controlledShutdown = controlledShutdown;
Expand Down Expand Up @@ -557,8 +558,10 @@ public void destroy() {
}
}
try {
this.zookeeper.shutdown();
this.zkConnect = null;
if (this.zookeeper != null) {
this.zookeeper.shutdown();
this.zkConnect = null;
}
}
catch (Exception e) {
// do nothing
Expand All @@ -582,7 +585,7 @@ public KafkaServer getKafkaServer(int id) {
return this.kafkaServers.get(id);
}

public EmbeddedZookeeper getZookeeper() {
public @Nullable EmbeddedZookeeper getZookeeper() {
return this.zookeeper;
}

Expand All @@ -599,7 +602,7 @@ public synchronized ZooKeeperClient getZooKeeperClient() {
return this.zooKeeperClient;
}

public String getZookeeperConnectionString() {
public @Nullable String getZookeeperConnectionString() {
return this.zkConnect;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Provides a class for assertj conditions.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.assertj;
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/**
* Provides classes for JUnit5 conditions.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.condition;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,17 +16,13 @@

package org.springframework.kafka.test.context;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaBrokerFactory;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.MergedContextConfiguration;
import org.springframework.util.Assert;

/**
* The {@link ContextCustomizer} implementation for the {@link EmbeddedKafkaBroker} bean registration.
Expand All @@ -51,16 +47,15 @@ class EmbeddedKafkaContextCustomizer implements ContextCustomizer {

@Override
public void customizeContext(ConfigurableApplicationContext context, MergedContextConfiguration mergedConfig) {
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
Assert.isInstanceOf(DefaultSingletonBeanRegistry.class, beanFactory);

ConfigurableEnvironment environment = context.getEnvironment();

EmbeddedKafkaBroker embeddedKafkaBroker =
EmbeddedKafkaBrokerFactory.create(this.embeddedKafka, environment::resolvePlaceholders);

((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME,
new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker));
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) context;

genericApplicationContext.registerBean(EmbeddedKafkaBroker.BEAN_NAME,
EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@

import java.util.List;

import org.jspecify.annotations.Nullable;

import org.springframework.test.context.ContextConfigurationAttributes;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.ContextCustomizerFactory;
Expand All @@ -35,6 +37,7 @@
class EmbeddedKafkaContextCustomizerFactory implements ContextCustomizerFactory {

@Override
@Nullable
public ContextCustomizer createContextCustomizer(Class<?> testClass,
List<ContextConfigurationAttributes> configAttributes) {
EmbeddedKafka embeddedKafka =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Provides classes for EmbeddedKafka context customization.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.context;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* core package for spring-kafka-test module.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.core;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Provides hamcrest matchers.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.hamcrest;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,8 +90,10 @@ public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionLi
public static final String BROKER_PROPERTIES_LOCATION_PROPERTY_NAME =
"spring.kafka.embedded.broker.properties.location";

@SuppressWarnings("NullAway.Init")
private EmbeddedKafkaBroker embeddedKafkaBroker;

@SuppressWarnings("NullAway.Init")
private Log logger;

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Provides JUnit specific extensions in spring-kafka-test.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.junit;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Provides top-level API for EmbeddedKafka.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Provides JUnit rules.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.rule;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/**
Expand Down Expand Up @@ -52,6 +53,7 @@ public static void waitForAssignment(Object container, int partitions) { // NOSO
return;
}
List<?> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class);
Assert.notNull(containers, "Containers must not be null");
int n = 0;
int count = 0;
Method getAssignedPartitions = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2024 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,11 +45,11 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
Expand All @@ -66,6 +66,7 @@ public final class KafkaTestUtils {

private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaTestUtils.class)); // NOSONAR

@SuppressWarnings("NullAway.Init")
private static Properties defaults;

private KafkaTestUtils() {
Expand Down Expand Up @@ -261,7 +262,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
* @throws Exception if an exception occurs.
* @since 2.3
*/
public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
public static @Nullable OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
throws Exception { // NOSONAR

try (AdminClient client = AdminClient
Expand All @@ -281,7 +282,7 @@ public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String
* @throws Exception if an exception occurs.
* @since 3.0
*/
public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
public static @Nullable OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
throws Exception { // NOSONAR

return adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get() // NOSONAR false positive
Expand Down Expand Up @@ -395,7 +396,7 @@ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, D
* @param propertyPath The path.
* @return The field.
*/
public static Object getPropertyValue(Object root, String propertyPath) {
public static @Nullable Object getPropertyValue(Object root, String propertyPath) {
Object value = null;
DirectFieldAccessor accessor = new DirectFieldAccessor(root);
String[] tokens = propertyPath.split("\\.");
Expand Down Expand Up @@ -424,7 +425,7 @@ else if (i == tokens.length - 1) {
* @see #getPropertyValue(Object, String)
*/
@SuppressWarnings("unchecked")
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type) {
public static <T> @Nullable T getPropertyValue(Object root, String propertyPath, Class<T> type) {
Object value = getPropertyValue(root, propertyPath);
if (value != null) {
Assert.isAssignable(type, value.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Utils package.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.test.utils;
Loading