Skip to content

Commit b8543ad

Browse files
authored
[improve][broker] Support cgroup v2 by using jdk.internal.platform.Metrics in Pulsar Loadbalancer (apache#16832)
1 parent 8b7aa63 commit b8543ad

File tree

7 files changed

+122
-13
lines changed

7 files changed

+122
-13
lines changed

bin/pulsar

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ if [[ -z "$IS_JAVA_8" ]]; then
307307
OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
308308
# MBeanStatsGenerator
309309
OPTS="$OPTS --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED"
310+
# LinuxInfoUtils
311+
OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
310312
fi
311313

312314
OPTS="-cp $PULSAR_CLASSPATH $OPTS"

buildtools/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
<test.additional.args>
5959
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED
6060
--add-opens java.base/java.lang=ALL-UNNAMED <!--Mockito-->
61+
--add-opens java.base/jdk.internal.platform=ALL-UNNAMED <!--LinuxInfoUtils-->
6162
</test.additional.args>
6263
</properties>
6364

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ flexible messaging model and an intuitive client API.</description>
109109
--add-opens java.base/sun.net=ALL-UNNAMED <!--netty.DnsResolverUtil-->
110110
--add-opens java.management/sun.management=ALL-UNNAMED <!--JvmDefaultGCMetricsLogger & MBeanStatsGenerator-->
111111
--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED <!--MBeanStatsGenerator-->
112+
--add-opens java.base/jdk.internal.platform=ALL-UNNAMED <!--LinuxInfoUtils-->
112113
</test.additional.args>
113114
<testReuseFork>true</testReuseFork>
114115
<testForkCount>4</testForkCount>

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import java.io.IOException;
23+
import java.lang.reflect.Method;
2224
import java.nio.charset.StandardCharsets;
2325
import java.nio.file.Files;
2426
import java.nio.file.Path;
@@ -45,13 +47,38 @@ public class LinuxInfoUtils {
4547
private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
4648
private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
4749
private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
50+
4851
// proc states
4952
private static final String PROC_STAT_PATH = "/proc/stat";
5053
private static final String NIC_PATH = "/sys/class/net/";
5154
// NIC type
5255
private static final int ARPHRD_ETHER = 1;
5356
private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";
5457

58+
private static Object /*jdk.internal.platform.Metrics*/ metrics;
59+
private static Method getMetricsProviderMethod;
60+
private static Method getCpuQuotaMethod;
61+
private static Method getCpuPeriodMethod;
62+
private static Method getCpuUsageMethod;
63+
64+
static {
65+
try {
66+
metrics = Class.forName("jdk.internal.platform.Container").getMethod("metrics")
67+
.invoke(null);
68+
if (metrics != null) {
69+
getMetricsProviderMethod = metrics.getClass().getMethod("getProvider");
70+
getMetricsProviderMethod.setAccessible(true);
71+
getCpuQuotaMethod = metrics.getClass().getMethod("getCpuQuota");
72+
getCpuQuotaMethod.setAccessible(true);
73+
getCpuPeriodMethod = metrics.getClass().getMethod("getCpuPeriod");
74+
getCpuPeriodMethod.setAccessible(true);
75+
getCpuUsageMethod = metrics.getClass().getMethod("getCpuUsage");
76+
getCpuUsageMethod.setAccessible(true);
77+
}
78+
} catch (Throwable e) {
79+
log.warn("Failed to get runtime metrics", e);
80+
}
81+
}
5582

5683
/**
5784
* Determine whether the OS is the linux kernel.
@@ -66,9 +93,14 @@ public static boolean isLinux() {
6693
*/
6794
public static boolean isCGroupEnabled() {
6895
try {
69-
return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
96+
if (metrics == null) {
97+
return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
98+
}
99+
String provider = (String) getMetricsProviderMethod.invoke(metrics);
100+
log.info("[LinuxInfo] The system metrics provider is: {}", provider);
101+
return provider.contains("cgroup");
70102
} catch (Exception e) {
71-
log.warn("[LinuxInfo] Failed to check cgroup CPU usage file: {}", e.getMessage());
103+
log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", e.getMessage());
72104
return false;
73105
}
74106
}
@@ -81,13 +113,21 @@ public static boolean isCGroupEnabled() {
81113
public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
82114
if (isCGroupsEnabled) {
83115
try {
84-
long quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
85-
long period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
116+
long quota;
117+
long period;
118+
if (metrics != null && getCpuQuotaMethod != null && getCpuPeriodMethod != null) {
119+
quota = (long) getCpuQuotaMethod.invoke(metrics);
120+
period = (long) getCpuPeriodMethod.invoke(metrics);
121+
} else {
122+
quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
123+
period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
124+
}
125+
86126
if (quota > 0) {
87127
return 100.0 * quota / period;
88128
}
89-
} catch (IOException e) {
90-
log.warn("[LinuxInfo] Failed to read CPU quotas from cgroups", e);
129+
} catch (Exception e) {
130+
log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", e);
91131
// Fallback to availableProcessors
92132
}
93133
}
@@ -99,11 +139,14 @@ public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
99139
* Get CGroup cpu usage.
100140
* @return Cpu usage
101141
*/
102-
public static double getCpuUsageForCGroup() {
142+
public static long getCpuUsageForCGroup() {
103143
try {
144+
if (metrics != null && getCpuUsageMethod != null) {
145+
return (long) getCpuUsageMethod.invoke(metrics);
146+
}
104147
return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
105-
} catch (IOException e) {
106-
log.error("[LinuxInfo] Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e);
148+
} catch (Exception e) {
149+
log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
107150
return -1;
108151
}
109152
}
@@ -291,6 +334,11 @@ enum Operstate {
291334
UP
292335
}
293336

337+
@VisibleForTesting
338+
public static Object getMetrics() {
339+
return metrics;
340+
}
341+
294342
@AllArgsConstructor
295343
public enum NICUsageType {
296344
// transport

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private double getTotalCpuUsage(double elapsedTimeSeconds) {
140140
}
141141

142142
private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
143-
double usage = getCpuUsageForCGroup();
143+
double usage = (double) getCpuUsageForCGroup();
144144
double currentUsage = usage - lastCpuUsage;
145145
lastCpuUsage = usage;
146146
return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1);

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static org.testng.Assert.assertTrue;
2222
import static org.testng.Assert.fail;
23+
import java.nio.file.Files;
24+
import java.nio.file.Paths;
2325
import java.util.Optional;
2426
import lombok.Cleanup;
2527
import lombok.extern.slf4j.Slf4j;
@@ -28,6 +30,7 @@
2830
import org.apache.pulsar.broker.ServiceConfiguration;
2931
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
3032
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
33+
import org.testng.Assert;
3134
import org.testng.annotations.Test;
3235

3336
@Slf4j
@@ -96,4 +99,27 @@ public void testNoNICSpeed() throws Exception {
9699
}
97100

98101

102+
@Test
103+
public void testCGroupMetrics() {
104+
if (!LinuxInfoUtils.isLinux()) {
105+
return;
106+
}
107+
108+
boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup"));
109+
boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled();
110+
Assert.assertEquals(cGroupEnabled, existsCGroup);
111+
112+
double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled);
113+
log.info("totalCpuLimit: {}", totalCpuLimit);
114+
Assert.assertTrue(totalCpuLimit > 0.0);
115+
116+
if (cGroupEnabled) {
117+
Assert.assertNotNull(LinuxInfoUtils.getMetrics());
118+
119+
long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup();
120+
log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup);
121+
Assert.assertTrue(cpuUsageForCGroup > 0);
122+
}
123+
}
124+
99125
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.impl;
2020

21-
import lombok.Cleanup;
22-
import org.testng.Assert;
23-
import org.testng.annotations.Test;
2421
import java.util.ArrayList;
2522
import java.util.List;
2623
import java.util.Optional;
2724
import java.util.concurrent.Executors;
2825
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
27+
import lombok.Cleanup;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
30+
import org.testng.Assert;
31+
import org.testng.annotations.Test;
2932

33+
@Slf4j
3034
public class LinuxBrokerHostUsageImplTest {
3135

3236
@Test
@@ -42,4 +46,31 @@ public void checkOverrideBrokerNicSpeedGbps() {
4246
double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
4347
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
4448
}
49+
50+
@Test
51+
public void testCpuUsage() throws InterruptedException {
52+
if (!LinuxInfoUtils.isLinux()) {
53+
return;
54+
}
55+
56+
@Cleanup("shutdown")
57+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
58+
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
59+
new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService);
60+
61+
linuxBrokerHostUsage.calculateBrokerHostUsage();
62+
TimeUnit.SECONDS.sleep(1);
63+
linuxBrokerHostUsage.calculateBrokerHostUsage();
64+
65+
double usage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage;
66+
double limit = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit;
67+
float percentUsage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage();
68+
69+
Assert.assertTrue(usage > 0);
70+
Assert.assertTrue(limit > 0);
71+
Assert.assertTrue(limit >= usage);
72+
Assert.assertTrue(percentUsage > 0);
73+
74+
log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit, percentUsage);
75+
}
4576
}

0 commit comments

Comments
 (0)