Skip to content

Commit bfde0de

Browse files
[feat][io] Support configuration secret interpolation (apache#20901)
PIP: apache#20903 Relates to: apache#20862 ### Motivation The primary motivation is to make it possible to configure Pulsar Connectors in a secure, non-plaintext way. See the PIP for background and relevant details. The new interpolation feature only applies when deploying with functions to Kubernetes. ### Modifications * Add `SecretsProvider#interpolateSecretForValue` method with a default that maintains the current behavior. * Override `interpolateSecretForValue` in the `EnvironmentBasedSecretsProvider` so that configuration values formatted as `${my-env-var}` will be replaced with the result of `System.getEnv("my-env-var")` if the result is not `null`. * Implement a recursive string interpolation method that will replace any configuration value that the `interpolateSecretForValue` implementation determines ought to be replaced. ### Verifying this change Tests are added/modified. ### Documentation - [x] `doc-required` ### Matching PR in forked repository PR in forked repository: michaeljmarshall#55
1 parent d014713 commit bfde0de

File tree

5 files changed

+143
-30
lines changed

5 files changed

+143
-30
lines changed

pulsar-functions/instance/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,15 @@
269269
</execution>
270270
</executions>
271271
</plugin>
272+
<plugin>
273+
<groupId>org.apache.maven.plugins</groupId>
274+
<artifactId>maven-surefire-plugin</artifactId>
275+
<configuration>
276+
<environmentVariables>
277+
<TEST_JAVA_INSTANCE_PARSE_ENV_VAR>some-configuration</TEST_JAVA_INSTANCE_PARSE_ENV_VAR>
278+
</environmentVariables>
279+
</configuration>
280+
</plugin>
272281
</plugins>
273282
</build>
274283

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -862,11 +862,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
862862
Thread.currentThread().setContextClassLoader(this.componentClassLoader);
863863
}
864864
try {
865-
if (sourceSpec.getConfigs().isEmpty()) {
866-
this.source.open(new HashMap<>(), contextImpl);
867-
} else {
868-
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
869-
}
865+
this.source.open(augmentAndFilterConnectorConfig(sourceSpec.getConfigs()), contextImpl);
870866
if (this.source instanceof PulsarSource) {
871867
contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers());
872868
}
@@ -877,31 +873,60 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
877873
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
878874
}
879875
}
880-
private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException {
881-
return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType);
876+
877+
/**
878+
* Recursively interpolate configured secrets into the config map by calling
879+
* {@link SecretsProvider#interpolateSecretForValue(String)}.
880+
* @param secretsProvider - the secrets provider that will convert secret's values into config values.
881+
* @param configs - the connector configuration map, which will be mutated.
882+
*/
883+
private static void interpolateSecretsIntoConfigs(SecretsProvider secretsProvider,
884+
Map<String, Object> configs) {
885+
for (Map.Entry<String, Object> entry : configs.entrySet()) {
886+
Object value = entry.getValue();
887+
if (value instanceof String) {
888+
String updatedValue = secretsProvider.interpolateSecretForValue((String) value);
889+
if (updatedValue != null) {
890+
entry.setValue(updatedValue);
891+
}
892+
} else if (value instanceof Map) {
893+
interpolateSecretsIntoConfigs(secretsProvider, (Map<String, Object>) value);
894+
}
895+
}
896+
}
897+
898+
private Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfigs) throws IOException {
899+
return augmentAndFilterConnectorConfig(connectorConfigs, instanceConfig, secretsProvider,
900+
componentClassLoader, componentType);
882901
}
883902

884-
static Map<String, Object> parseComponentConfig(String connectorConfigs,
885-
InstanceConfig instanceConfig,
886-
ClassLoader componentClassLoader,
887-
org.apache.pulsar.functions.proto.Function
903+
static Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfigs,
904+
InstanceConfig instanceConfig,
905+
SecretsProvider secretsProvider,
906+
ClassLoader componentClassLoader,
907+
org.apache.pulsar.functions.proto.Function
888908
.FunctionDetails.ComponentType componentType)
889909
throws IOException {
890-
final Map<String, Object> config = ObjectMapperFactory
910+
final Map<String, Object> config = connectorConfigs.isEmpty() ? new HashMap<>() : ObjectMapperFactory
891911
.getMapper()
892912
.reader()
893913
.forType(new TypeReference<Map<String, Object>>() {})
894914
.readValue(connectorConfigs);
915+
if (componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK
916+
&& componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
917+
return config;
918+
}
919+
920+
interpolateSecretsIntoConfigs(secretsProvider, config);
921+
895922
if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) {
896923
final String configClassName;
897924
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
898925
configClassName = ConnectorUtils
899926
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
900-
} else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
927+
} else {
901928
configClassName = ConnectorUtils
902929
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
903-
} else {
904-
return config;
905930
}
906931
if (configClassName != null) {
907932

@@ -1014,19 +1039,11 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
10141039
Thread.currentThread().setContextClassLoader(this.componentClassLoader);
10151040
}
10161041
try {
1017-
if (sinkSpec.getConfigs().isEmpty()) {
1018-
if (log.isDebugEnabled()) {
1019-
log.debug("Opening Sink with empty hashmap with contextImpl: {} ", contextImpl.toString());
1020-
}
1021-
this.sink.open(new HashMap<>(), contextImpl);
1022-
} else {
1023-
if (log.isDebugEnabled()) {
1024-
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec,
1025-
contextImpl.toString());
1026-
}
1027-
final Map<String, Object> config = parseComponentConfig(sinkSpec.getConfigs());
1028-
this.sink.open(config, contextImpl);
1042+
if (log.isDebugEnabled()) {
1043+
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec.getConfigs(),
1044+
contextImpl.toString());
10291045
}
1046+
this.sink.open(augmentAndFilterConnectorConfig(sinkSpec.getConfigs()), contextImpl);
10301047
} catch (Exception e) {
10311048
log.error("Sink open produced uncaught exception: ", e);
10321049
throw e;

pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
4848
import org.apache.pulsar.functions.proto.Function.SinkSpec;
4949
import org.apache.pulsar.functions.proto.InstanceCommunication;
50+
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
5051
import org.jetbrains.annotations.NotNull;
5152
import org.testng.Assert;
5253
import org.testng.annotations.DataProvider;
@@ -191,9 +192,10 @@ public void testStatsManagerNull() throws Exception {
191192

192193
@Test
193194
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
194-
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
195+
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
195196
"{\"ttl\": 9223372036854775807}",
196197
new InstanceConfig(),
198+
new EnvironmentBasedSecretsProvider(),
197199
null,
198200
FunctionDetails.ComponentType.SINK
199201
);
@@ -203,16 +205,69 @@ public void testSinkConfigParsingPreservesOriginalType() throws Exception {
203205

204206
@Test
205207
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
206-
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
208+
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
207209
"{\"ttl\": 9223372036854775807}",
208210
new InstanceConfig(),
211+
new EnvironmentBasedSecretsProvider(),
209212
null,
210213
FunctionDetails.ComponentType.SOURCE
211214
);
212215
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
213216
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
214217
}
215218

219+
@DataProvider(name = "component")
220+
public Object[][] component() {
221+
return new Object[][]{
222+
// Schema: component type, whether to map in secrets
223+
{ FunctionDetails.ComponentType.SINK },
224+
{ FunctionDetails.ComponentType.SOURCE },
225+
{ FunctionDetails.ComponentType.FUNCTION },
226+
{ FunctionDetails.ComponentType.UNKNOWN },
227+
};
228+
}
229+
230+
@Test(dataProvider = "component")
231+
public void testEmptyStringInput(FunctionDetails.ComponentType componentType) throws Exception {
232+
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
233+
"",
234+
new InstanceConfig(),
235+
new EnvironmentBasedSecretsProvider(),
236+
null,
237+
componentType
238+
);
239+
Assert.assertEquals(parsedConfig.size(), 0);
240+
}
241+
242+
// Environment variables are set in the pom.xml file
243+
@Test(dataProvider = "component")
244+
public void testInterpolatingEnvironmentVariables(FunctionDetails.ComponentType componentType) throws Exception {
245+
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
246+
"""
247+
{
248+
"key": {
249+
"key1": "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}",
250+
"key2": "${unset-env-var}"
251+
},
252+
"key3": "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}"
253+
}
254+
""",
255+
new InstanceConfig(),
256+
new EnvironmentBasedSecretsProvider(),
257+
null,
258+
componentType
259+
);
260+
if ((componentType == FunctionDetails.ComponentType.SOURCE
261+
|| componentType == FunctionDetails.ComponentType.SINK)) {
262+
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key1"), "some-configuration");
263+
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key2"), "${unset-env-var}");
264+
Assert.assertEquals(parsedConfig.get("key3"), "some-configuration");
265+
} else {
266+
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key1"), "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}");
267+
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key2"), "${unset-env-var}");
268+
Assert.assertEquals(parsedConfig.get("key3"), "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}");
269+
}
270+
}
216271

217272
public static class ConnectorTestConfig1 {
218273
public String field1;
@@ -243,9 +298,10 @@ public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields,
243298
final InstanceConfig instanceConfig = new InstanceConfig();
244299
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
245300

246-
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
301+
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
247302
"{\"field1\": \"value\", \"field2\": \"value2\"}",
248303
instanceConfig,
304+
new EnvironmentBasedSecretsProvider(),
249305
narClassLoader,
250306
type
251307
);

pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,20 @@
1818
*/
1919
package org.apache.pulsar.functions.secretsprovider;
2020

21+
import java.util.regex.Matcher;
22+
import java.util.regex.Pattern;
23+
2124
/**
2225
* This defines a very simple Secrets Provider that looks up environment variable
2326
* thats named the same as secretName and fetches it.
2427
*/
2528
public class EnvironmentBasedSecretsProvider implements SecretsProvider {
2629

30+
/**
31+
* Pattern to match ${secretName} in the value.
32+
*/
33+
private static final Pattern interpolationPattern = Pattern.compile("\\$\\{(.+?)}");
34+
2735
/**
2836
* Fetches a secret.
2937
*
@@ -33,4 +41,15 @@ public class EnvironmentBasedSecretsProvider implements SecretsProvider {
3341
public String provideSecret(String secretName, Object pathToSecret) {
3442
return System.getenv(secretName);
3543
}
44+
45+
@Override
46+
public String interpolateSecretForValue(String value) {
47+
Matcher m = interpolationPattern.matcher(value);
48+
if (m.matches()) {
49+
String secretName = m.group(1);
50+
// If the secret doesn't exist, we return null and don't override the current value.
51+
return provideSecret(secretName, null);
52+
}
53+
return null;
54+
}
3655
}

pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,16 @@ default void init(Map<String, String> config) {}
3939
* @return The actual secret
4040
*/
4141
String provideSecret(String secretName, Object pathToSecret);
42+
43+
/**
44+
* If the passed value is formatted as a reference to a secret, as defined by the implementation, return the
45+
* referenced secret. If the value is not formatted as a secret reference or the referenced secret does not exist,
46+
* return null.
47+
*
48+
* @param value a config value that may be formatted as a reference to a secret
49+
* @return the materialized secret. Otherwise, null.
50+
*/
51+
default String interpolateSecretForValue(String value) {
52+
return null;
53+
}
4254
}

0 commit comments

Comments
 (0)