Skip to content

Commit 6559f33

Browse files
committed
Allow override of orchestrator/source resources via feature flag (#8768)
1 parent 5571015 commit 6559f33

File tree

3 files changed

+202
-22
lines changed

3 files changed

+202
-22
lines changed

airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ object HideActorDefinitionFromList : Permanent<Boolean>(key = "connectors.hideAc
103103

104104
object PauseSyncsWithUnsupportedActors : Temporary<Boolean>(key = "connectors.pauseSyncsWithUnsupportedActors", default = true)
105105

106+
object DestResourceOverrides : Temporary<String>(key = "dest-resource-overrides", default = "")
107+
108+
object OrchestratorResourceOverrides : Temporary<String>(key = "orchestrator-resource-overrides", default = "")
109+
110+
object SourceResourceOverrides : Temporary<String>(key = "source-resource-overrides", default = "")
111+
106112
// NOTE: this is deprecated in favor of FieldSelectionEnabled and will be removed once that flag is fully deployed.
107113
object FieldSelectionWorkspaces : EnvVar(envVar = "FIELD_SELECTION_WORKSPACES") {
108114
override fun enabled(ctx: Context): Boolean {
@@ -128,6 +134,4 @@ object FieldSelectionWorkspaces : EnvVar(envVar = "FIELD_SELECTION_WORKSPACES")
128134
object ConnectorOAuthConsentDisabled : Permanent<Boolean>(key = "connectors.oauth.disableOAuthConsent", default = false)
129135

130136
object AddSchedulingJitter : Temporary<Boolean>(key = "platform.add-scheduling-jitter", default = false)
131-
132-
object DestResourceOverrides : Temporary<String>(key = "dest-resource-overrides", default = "")
133137
}

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import io.airbyte.config.provider.ResourceRequirementsProvider;
2828
import io.airbyte.featureflag.Connection;
2929
import io.airbyte.featureflag.Context;
30+
import io.airbyte.featureflag.DestResourceOverrides;
3031
import io.airbyte.featureflag.Destination;
3132
import io.airbyte.featureflag.DestinationDefinition;
3233
import io.airbyte.featureflag.FeatureFlagClient;
33-
import io.airbyte.featureflag.FieldSelectionWorkspaces.DestResourceOverrides;
3434
import io.airbyte.featureflag.Multi;
35+
import io.airbyte.featureflag.OrchestratorResourceOverrides;
3536
import io.airbyte.featureflag.Source;
3637
import io.airbyte.featureflag.SourceDefinition;
38+
import io.airbyte.featureflag.SourceResourceOverrides;
3739
import io.airbyte.featureflag.UseResourceRequirementsVariant;
3840
import io.airbyte.featureflag.Workspace;
3941
import io.airbyte.protocol.models.CatalogHelpers;
@@ -183,7 +185,7 @@ private SyncResourceRequirements getSyncResourceRequirements(final UUID workspac
183185
// destination based on the source to avoid oversizing orchestrator and destination when the source
184186
// is slow.
185187
final Optional<String> sourceType = getSourceType(sourceDefinition);
186-
final ResourceRequirements mergedOrchestratorResourceReq = getOrchestratorResourceRequirements(standardSync, sourceType, variant);
188+
final ResourceRequirements mergedOrchestratorResourceReq = getOrchestratorResourceRequirements(standardSync, sourceType, variant, ffContext);
187189
final ResourceRequirements mergedDstResourceReq =
188190
getDestinationResourceRequirements(standardSync, destinationDefinition, sourceType, variant, ffContext);
189191

@@ -197,7 +199,7 @@ private SyncResourceRequirements getSyncResourceRequirements(final UUID workspac
197199
.withOrchestrator(mergedOrchestratorResourceReq);
198200

199201
if (!isReset) {
200-
final ResourceRequirements mergedSrcResourceReq = getSourceResourceRequirements(standardSync, sourceDefinition, variant);
202+
final ResourceRequirements mergedSrcResourceReq = getSourceResourceRequirements(standardSync, sourceDefinition, variant, ffContext);
201203
syncResourceRequirements
202204
.withSource(mergedSrcResourceReq)
203205
.withSourceStdErr(resourceRequirementsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE_STDERR, sourceType, variant))
@@ -230,35 +232,36 @@ private static void addIfNotNull(final List<Context> contextList, final UUID uui
230232

231233
private ResourceRequirements getOrchestratorResourceRequirements(final StandardSync standardSync,
232234
final Optional<String> sourceType,
233-
final String variant) {
235+
final String variant,
236+
final Context ffContext) {
234237
final ResourceRequirements defaultOrchestratorRssReqs =
235238
resourceRequirementsProvider.getResourceRequirements(ResourceRequirementsType.ORCHESTRATOR, sourceType, variant);
236-
return ResourceRequirementsUtils.getResourceRequirements(
239+
240+
final var mergedRrsReqs = ResourceRequirementsUtils.getResourceRequirements(
237241
standardSync.getResourceRequirements(),
238242
defaultOrchestratorRssReqs);
243+
244+
final var overrides = getOrchestratorResourceOverrides(ffContext);
245+
246+
return ResourceRequirementsUtils.getResourceRequirements(overrides, mergedRrsReqs);
239247
}
240248

241249
private ResourceRequirements getSourceResourceRequirements(final StandardSync standardSync,
242250
final StandardSourceDefinition sourceDefinition,
243-
final String variant) {
251+
final String variant,
252+
final Context ffContext) {
244253
final ResourceRequirements defaultSrcRssReqs =
245254
resourceRequirementsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE, getSourceType(sourceDefinition), variant);
246-
return ResourceRequirementsUtils.getResourceRequirements(
255+
256+
final var mergedRssReqs = ResourceRequirementsUtils.getResourceRequirements(
247257
standardSync.getResourceRequirements(),
248258
sourceDefinition != null ? sourceDefinition.getResourceRequirements() : null,
249259
defaultSrcRssReqs,
250260
JobType.SYNC);
251-
}
252261

253-
private ResourceRequirements getDestinationResourceOverrides(final Context ffCtx) {
254-
final String destOverrides = featureFlagClient.stringVariation(DestResourceOverrides.INSTANCE, ffCtx);
255-
try {
256-
return ResourceRequirementsUtils.parse(destOverrides);
257-
} catch (final Exception e) {
258-
log.warn("Could not parse DESTINATION resource overrides from feature flag string: '{}'", destOverrides);
259-
log.warn("Error parsing DESTINATION resource overrides: {}", e.getMessage());
260-
return null;
261-
}
262+
final var overrides = getSourceResourceOverrides(ffContext);
263+
264+
return ResourceRequirementsUtils.getResourceRequirements(overrides, mergedRssReqs);
262265
}
263266

264267
private ResourceRequirements getDestinationResourceRequirements(final StandardSync standardSync,
@@ -280,6 +283,36 @@ private ResourceRequirements getDestinationResourceRequirements(final StandardSy
280283
return ResourceRequirementsUtils.getResourceRequirements(overrides, mergedRssReqs);
281284
}
282285

286+
private ResourceRequirements getDestinationResourceOverrides(final Context ffCtx) {
287+
final String destOverrides = featureFlagClient.stringVariation(DestResourceOverrides.INSTANCE, ffCtx);
288+
try {
289+
return ResourceRequirementsUtils.parse(destOverrides);
290+
} catch (final Exception e) {
291+
log.warn("Could not parse DESTINATION resource overrides '{}' from feature flag string: {}", destOverrides, e.getMessage());
292+
return null;
293+
}
294+
}
295+
296+
private ResourceRequirements getOrchestratorResourceOverrides(final Context ffCtx) {
297+
final String orchestratorOverrides = featureFlagClient.stringVariation(OrchestratorResourceOverrides.INSTANCE, ffCtx);
298+
try {
299+
return ResourceRequirementsUtils.parse(orchestratorOverrides);
300+
} catch (final Exception e) {
301+
log.warn("Could not parse ORCHESTRATOR resource overrides '{}' from feature flag string: {}", orchestratorOverrides, e.getMessage());
302+
return null;
303+
}
304+
}
305+
306+
private ResourceRequirements getSourceResourceOverrides(final Context ffCtx) {
307+
final String sourceOverrides = featureFlagClient.stringVariation(SourceResourceOverrides.INSTANCE, ffCtx);
308+
try {
309+
return ResourceRequirementsUtils.parse(sourceOverrides);
310+
} catch (final Exception e) {
311+
log.warn("Could not parse SOURCE resource overrides '{}' from feature flag string: {}", sourceOverrides, e.getMessage());
312+
return null;
313+
}
314+
}
315+
283316
private Optional<String> getSourceType(final StandardSourceDefinition sourceDefinition) {
284317
if (sourceDefinition == null) {
285318
return Optional.empty();

airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import io.airbyte.config.SyncResourceRequirements;
4242
import io.airbyte.config.SyncResourceRequirementsKey;
4343
import io.airbyte.config.provider.ResourceRequirementsProvider;
44-
import io.airbyte.featureflag.FieldSelectionWorkspaces.DestResourceOverrides;
44+
import io.airbyte.featureflag.DestResourceOverrides;
45+
import io.airbyte.featureflag.OrchestratorResourceOverrides;
46+
import io.airbyte.featureflag.SourceResourceOverrides;
4547
import io.airbyte.featureflag.TestClient;
4648
import io.airbyte.protocol.models.CatalogHelpers;
4749
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
@@ -92,6 +94,7 @@ class DefaultJobCreatorTest {
9294
private static final StandardDestinationDefinition STANDARD_DESTINATION_DEFINITION;
9395
private static final ActorDefinitionVersion SOURCE_DEFINITION_VERSION;
9496
private static final ActorDefinitionVersion DESTINATION_DEFINITION_VERSION;
97+
private static final ConfiguredAirbyteCatalog CONFIGURED_AIRBYTE_CATALOG;
9598
private static final long JOB_ID = 12L;
9699
private static final UUID WORKSPACE_ID = UUID.randomUUID();
97100

@@ -151,7 +154,7 @@ class DefaultJobCreatorTest {
151154
.withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
152155
.withSyncMode(SyncMode.FULL_REFRESH)
153156
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE);
154-
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream1, stream2, stream3));
157+
CONFIGURED_AIRBYTE_CATALOG = new ConfiguredAirbyteCatalog().withStreams(List.of(stream1, stream2, stream3));
155158

156159
STANDARD_SYNC = new StandardSync()
157160
.withConnectionId(connectionId)
@@ -160,7 +163,7 @@ class DefaultJobCreatorTest {
160163
.withNamespaceFormat(null)
161164
.withPrefix("presto_to_hudi")
162165
.withStatus(StandardSync.Status.ACTIVE)
163-
.withCatalog(catalog)
166+
.withCatalog(CONFIGURED_AIRBYTE_CATALOG)
164167
.withSourceId(sourceId)
165168
.withDestinationId(destinationId)
166169
.withOperationIds(List.of(operationId));
@@ -581,6 +584,146 @@ void testDestinationResourceReqsOverrides(final String cpuReqOverride,
581584
assertEquals(expectedMemLimit, destConfigValues.getMemoryLimit());
582585
}
583586

587+
@ParameterizedTest
588+
@MethodSource("resourceOverrideMatrix")
589+
void testOrchestratorResourceReqsOverrides(final String cpuReqOverride,
590+
final String cpuLimitOverride,
591+
final String memReqOverride,
592+
final String memLimitOverride)
593+
throws IOException {
594+
final var overrides = new HashMap<>();
595+
if (cpuReqOverride != null) {
596+
overrides.put("cpu_request", cpuReqOverride);
597+
}
598+
if (cpuLimitOverride != null) {
599+
overrides.put("cpu_limit", cpuLimitOverride);
600+
}
601+
if (memReqOverride != null) {
602+
overrides.put("memory_request", memReqOverride);
603+
}
604+
if (memLimitOverride != null) {
605+
overrides.put("memory_limit", memLimitOverride);
606+
}
607+
608+
final ResourceRequirements originalReqs = new ResourceRequirements()
609+
.withCpuLimit("0.8")
610+
.withCpuRequest("0.8")
611+
.withMemoryLimit("800Mi")
612+
.withMemoryRequest("800Mi");
613+
614+
final var jobCreator = new DefaultJobCreator(jobPersistence, resourceRequirementsProvider,
615+
new TestClient(Map.of(OrchestratorResourceOverrides.INSTANCE.getKey(), Jsons.serialize(overrides))));
616+
617+
final var standardSync = new StandardSync()
618+
.withConnectionId(UUID.randomUUID())
619+
.withName("presto to hudi")
620+
.withNamespaceDefinition(NamespaceDefinitionType.SOURCE)
621+
.withNamespaceFormat(null)
622+
.withPrefix("presto_to_hudi")
623+
.withStatus(StandardSync.Status.ACTIVE)
624+
.withCatalog(CONFIGURED_AIRBYTE_CATALOG)
625+
.withSourceId(UUID.randomUUID())
626+
.withDestinationId(UUID.randomUUID())
627+
.withOperationIds(List.of(UUID.randomUUID()))
628+
.withResourceRequirements(originalReqs);
629+
630+
jobCreator.createSyncJob(
631+
SOURCE_CONNECTION,
632+
DESTINATION_CONNECTION,
633+
standardSync,
634+
SOURCE_IMAGE_NAME,
635+
SOURCE_PROTOCOL_VERSION,
636+
DESTINATION_IMAGE_NAME,
637+
DESTINATION_PROTOCOL_VERSION,
638+
List.of(STANDARD_SYNC_OPERATION),
639+
null,
640+
new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)),
641+
new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(destResourceRequirements)),
642+
SOURCE_DEFINITION_VERSION,
643+
DESTINATION_DEFINITION_VERSION,
644+
WORKSPACE_ID);
645+
646+
final ArgumentCaptor<JobConfig> configCaptor = ArgumentCaptor.forClass(JobConfig.class);
647+
verify(jobPersistence, times(1)).enqueueJob(any(), configCaptor.capture());
648+
final var orchestratorConfigValues = configCaptor.getValue().getSync().getSyncResourceRequirements().getOrchestrator();
649+
650+
final var expectedCpuReq = StringUtils.isNotBlank(cpuReqOverride) ? cpuReqOverride : originalReqs.getCpuRequest();
651+
assertEquals(expectedCpuReq, orchestratorConfigValues.getCpuRequest());
652+
653+
final var expectedCpuLimit = StringUtils.isNotBlank(cpuLimitOverride) ? cpuLimitOverride : originalReqs.getCpuLimit();
654+
assertEquals(expectedCpuLimit, orchestratorConfigValues.getCpuLimit());
655+
656+
final var expectedMemReq = StringUtils.isNotBlank(memReqOverride) ? memReqOverride : originalReqs.getMemoryRequest();
657+
assertEquals(expectedMemReq, orchestratorConfigValues.getMemoryRequest());
658+
659+
final var expectedMemLimit = StringUtils.isNotBlank(memLimitOverride) ? memLimitOverride : originalReqs.getMemoryLimit();
660+
assertEquals(expectedMemLimit, orchestratorConfigValues.getMemoryLimit());
661+
}
662+
663+
@ParameterizedTest
664+
@MethodSource("resourceOverrideMatrix")
665+
void testSourceResourceReqsOverrides(final String cpuReqOverride,
666+
final String cpuLimitOverride,
667+
final String memReqOverride,
668+
final String memLimitOverride)
669+
throws IOException {
670+
final var overrides = new HashMap<>();
671+
if (cpuReqOverride != null) {
672+
overrides.put("cpu_request", cpuReqOverride);
673+
}
674+
if (cpuLimitOverride != null) {
675+
overrides.put("cpu_limit", cpuLimitOverride);
676+
}
677+
if (memReqOverride != null) {
678+
overrides.put("memory_request", memReqOverride);
679+
}
680+
if (memLimitOverride != null) {
681+
overrides.put("memory_limit", memLimitOverride);
682+
}
683+
684+
final ResourceRequirements originalReqs = new ResourceRequirements()
685+
.withCpuLimit("0.8")
686+
.withCpuRequest("0.8")
687+
.withMemoryLimit("800Mi")
688+
.withMemoryRequest("800Mi");
689+
690+
final var jobCreator = new DefaultJobCreator(jobPersistence, resourceRequirementsProvider,
691+
new TestClient(Map.of(SourceResourceOverrides.INSTANCE.getKey(), Jsons.serialize(overrides))));
692+
693+
jobCreator.createSyncJob(
694+
SOURCE_CONNECTION,
695+
DESTINATION_CONNECTION,
696+
STANDARD_SYNC,
697+
SOURCE_IMAGE_NAME,
698+
SOURCE_PROTOCOL_VERSION,
699+
DESTINATION_IMAGE_NAME,
700+
DESTINATION_PROTOCOL_VERSION,
701+
List.of(STANDARD_SYNC_OPERATION),
702+
null,
703+
new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
704+
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(originalReqs)))),
705+
new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(destResourceRequirements)),
706+
SOURCE_DEFINITION_VERSION,
707+
DESTINATION_DEFINITION_VERSION,
708+
WORKSPACE_ID);
709+
710+
final ArgumentCaptor<JobConfig> configCaptor = ArgumentCaptor.forClass(JobConfig.class);
711+
verify(jobPersistence, times(1)).enqueueJob(any(), configCaptor.capture());
712+
final var sourceConfigValues = configCaptor.getValue().getSync().getSyncResourceRequirements().getSource();
713+
714+
final var expectedCpuReq = StringUtils.isNotBlank(cpuReqOverride) ? cpuReqOverride : originalReqs.getCpuRequest();
715+
assertEquals(expectedCpuReq, sourceConfigValues.getCpuRequest());
716+
717+
final var expectedCpuLimit = StringUtils.isNotBlank(cpuLimitOverride) ? cpuLimitOverride : originalReqs.getCpuLimit();
718+
assertEquals(expectedCpuLimit, sourceConfigValues.getCpuLimit());
719+
720+
final var expectedMemReq = StringUtils.isNotBlank(memReqOverride) ? memReqOverride : originalReqs.getMemoryRequest();
721+
assertEquals(expectedMemReq, sourceConfigValues.getMemoryRequest());
722+
723+
final var expectedMemLimit = StringUtils.isNotBlank(memLimitOverride) ? memLimitOverride : originalReqs.getMemoryLimit();
724+
assertEquals(expectedMemLimit, sourceConfigValues.getMemoryLimit());
725+
}
726+
584727
private static Stream<Arguments> resourceOverrideMatrix() {
585728
return Stream.of(
586729
Arguments.of("0.7", "0.4", "1000Mi", "2000Mi"),

0 commit comments

Comments
 (0)