Skip to content

Commit 52c17a3

Browse files
committed
JobCoordinator can be configured to use only subset of FeatureSets or Stores (feast-dev#947)
1 parent afccfbf commit 52c17a3

File tree

6 files changed

+162
-12
lines changed

6 files changed

+162
-12
lines changed

core/src/main/java/feast/core/config/FeastProperties.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import feast.common.logging.config.LoggingProperties;
2323
import feast.common.validators.OneOfStrings;
2424
import feast.core.config.FeastProperties.StreamProperties.FeatureStreamOptions;
25+
import feast.proto.core.StoreProto;
2526
import java.net.InetAddress;
2627
import java.net.UnknownHostException;
2728
import java.util.ArrayList;
@@ -114,6 +115,30 @@ public static class CoordinatorProperties {
114115

115116
/* Labels to identify jobs managed by this job coordinator */
116117
private Map<String, String> jobSelector = new HashMap<>();
118+
119+
/* Selectors to define featureSets that are responsibility of current JobManager */
120+
private List<FeatureSetSelector> featureSetSelector = new ArrayList<>();
121+
122+
/* Specify names of stores that must be used by current JobManager */
123+
private List<String> whitelistedStores = new ArrayList<>();
124+
125+
/**
126+
* Similarly to Store's subscription this selector defines set of FeatureSets. All FeatureSets
127+
* that match both project and name will be selected. Project and name may use *
128+
*/
129+
@Getter
130+
@Setter
131+
public static class FeatureSetSelector {
132+
private String project;
133+
private String name;
134+
135+
public StoreProto.Store.Subscription toSubscription() {
136+
return StoreProto.Store.Subscription.newBuilder()
137+
.setName(this.name)
138+
.setProject(this.project)
139+
.build();
140+
}
141+
}
117142
}
118143

119144
/** List of configured job runners. */

core/src/main/java/feast/core/service/JobCoordinatorService.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public class JobCoordinatorService {
6666
private final JobProperties jobProperties;
6767
private final JobGroupingStrategy groupingStrategy;
6868
private final KafkaTemplate<String, FeatureSetSpec> specPublisher;
69+
private final List<Store.Subscription> featureSetSubscriptions;
70+
private final List<String> whitelistedStores;
6971

7072
@Autowired
7173
public JobCoordinatorService(
@@ -81,6 +83,11 @@ public JobCoordinatorService(
8183
this.jobProperties = feastProperties.getJobs();
8284
this.specPublisher = specPublisher;
8385
this.groupingStrategy = groupingStrategy;
86+
this.featureSetSubscriptions =
87+
feastProperties.getJobs().getCoordinator().getFeatureSetSelector().stream()
88+
.map(JobProperties.CoordinatorProperties.FeatureSetSelector::toSubscription)
89+
.collect(Collectors.toList());
90+
this.whitelistedStores = feastProperties.getJobs().getCoordinator().getWhitelistedStores();
8491
}
8592

8693
/**
@@ -290,7 +297,9 @@ private Collection<Job> getExtraJobs(List<Job> keepJobs) {
290297

291298
private List<Store> getAllStores() {
292299
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
293-
return listStoresResponse.getStoreList();
300+
return listStoresResponse.getStoreList().stream()
301+
.filter(s -> this.whitelistedStores.contains(s.getName()))
302+
.collect(Collectors.toList());
294303
}
295304

296305
/**
@@ -319,7 +328,7 @@ Iterable<Pair<Source, Set<Store>>> getSourceToStoreMappings() {
319328
* @param store to get subscribed FeatureSets for
320329
* @return list of FeatureSets that the store subscribes to.
321330
*/
322-
private List<FeatureSet> getFeatureSetsForStore(Store store) {
331+
List<FeatureSet> getFeatureSetsForStore(Store store) {
323332
return store.getSubscriptionsList().stream()
324333
.flatMap(
325334
subscription -> {
@@ -330,7 +339,14 @@ private List<FeatureSet> getFeatureSetsForStore(Store store) {
330339
.setProject(subscription.getProject())
331340
.setFeatureSetName(subscription.getName())
332341
.build())
333-
.getFeatureSetsList().stream();
342+
.getFeatureSetsList().stream()
343+
.filter(
344+
f ->
345+
this.featureSetSubscriptions.isEmpty()
346+
|| isSubscribedToFeatureSet(
347+
this.featureSetSubscriptions,
348+
f.getSpec().getProject(),
349+
f.getSpec().getName()));
334350
} catch (InvalidProtocolBufferException e) {
335351
throw new RuntimeException(
336352
String.format(

core/src/main/resources/application.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ feast:
7676
jobSelector:
7777
application: feast
7878

79+
# Specify feature sets that should be handled by current instance of JobManager
80+
featureSetSelector:
81+
- project: "*"
82+
name: "*"
83+
# Stores names that are enabled on current instance of JobManager
84+
whitelisted-stores:
85+
- online
86+
- online_cluster
87+
- historical
88+
7989
stream:
8090
# Feature stream type. Only kafka is supported.
8191
type: kafka

core/src/test/java/feast/core/service/JobCoordinatorIT.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@
5555
"feast.jobs.enabled=true",
5656
"feast.jobs.polling_interval_milliseconds=1000",
5757
"feast.stream.specsOptions.notifyIntervalMilliseconds=100",
58-
"feast.jobs.coordinator.consolidate-jobs-per-source=true"
58+
"feast.jobs.coordinator.consolidate-jobs-per-source=true",
59+
"feast.jobs.coordinator.feature-set-selector[0].name=test",
60+
"feast.jobs.coordinator.feature-set-selector[0].project=default",
61+
"feast.jobs.coordinator.whitelisted-stores[0]=test-store",
62+
"feast.jobs.coordinator.whitelisted-stores[1]=new-store",
5963
})
6064
public class JobCoordinatorIT extends BaseIT {
6165
@Autowired private FakeJobManager jobManager;
@@ -127,7 +131,7 @@ public void shouldCreateJobForNewSource() {
127131
@Test
128132
public void shouldUpgradeJobWhenStoreChanged() {
129133
apiClient.simpleApplyFeatureSet(
130-
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project", "test"));
134+
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"));
131135

132136
await().until(jobManager::getAllJobs, hasSize(1));
133137

@@ -151,7 +155,7 @@ public void shouldUpgradeJobWhenStoreChanged() {
151155
@Test
152156
public void shouldRestoreJobThatStopped() {
153157
apiClient.simpleApplyFeatureSet(
154-
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project", "test"));
158+
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"));
155159

156160
await().until(jobManager::getAllJobs, hasSize(1));
157161
Job job = jobRepository.findByStatus(JobStatus.RUNNING).get(0);
@@ -173,11 +177,28 @@ public void shouldRestoreJobThatStopped() {
173177
hasProperty("id", not(ingestionJobs.get(0).getId())))));
174178
}
175179

180+
@Test
181+
@SneakyThrows
182+
public void shouldNotCreateJobForUnwantedFeatureSet() {
183+
apiClient.simpleApplyFeatureSet(
184+
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "other"));
185+
186+
Thread.sleep(2000);
187+
188+
assertThat(jobManager.getAllJobs(), hasSize(0));
189+
}
190+
176191
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
177192
@Nested
178193
class SpecNotificationFlow extends SequentialFlow {
179194
Job job;
180195

196+
@AfterAll
197+
public void tearDown() {
198+
jobManager.cleanAll();
199+
jobRepository.deleteAll();
200+
}
201+
181202
@Test
182203
@Order(1)
183204
public void shouldSendNewSpec() {

core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package feast.core.service;
1818

1919
import static org.hamcrest.MatcherAssert.assertThat;
20+
import static org.hamcrest.Matchers.containsInAnyOrder;
2021
import static org.hamcrest.Matchers.hasSize;
2122
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
2223
import static org.hamcrest.core.Is.isA;
@@ -73,7 +74,21 @@ public void setUp() {
7374
feastProperties = new FeastProperties();
7475
JobProperties jobProperties = new JobProperties();
7576
jobProperties.setJobUpdateTimeoutSeconds(5);
77+
78+
JobProperties.CoordinatorProperties.FeatureSetSelector selector =
79+
new JobProperties.CoordinatorProperties.FeatureSetSelector();
80+
selector.setName("fs*");
81+
selector.setProject("*");
82+
83+
JobProperties.CoordinatorProperties coordinatorProperties =
84+
new JobProperties.CoordinatorProperties();
85+
coordinatorProperties.setFeatureSetSelector(ImmutableList.of(selector));
86+
coordinatorProperties.setWhitelistedStores(
87+
ImmutableList.of("test-store", "test", "test-1", "test-2", "normal-store"));
88+
89+
jobProperties.setCoordinator(coordinatorProperties);
7690
feastProperties.setJobs(jobProperties);
91+
7792
TestUtil.setupAuditLogger();
7893

7994
JobManager jobManager = mock(JobManager.class);
@@ -136,8 +151,8 @@ public void shouldGroupJobsBySource() {
136151
Source source1 = DataGenerator.createSource("servers:9092", "topic");
137152
Source source2 = DataGenerator.createSource("others.servers:9092", "topic");
138153

139-
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "project1", "features1");
140-
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "project1", "features2");
154+
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "project1", "fs1");
155+
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "project1", "fs2");
141156

142157
when(specService.listFeatureSets(
143158
Filter.newBuilder().setFeatureSetName("*").setProject("project1").build()))
@@ -170,8 +185,8 @@ public void shouldUseStoreSubscriptionToMapStore() {
170185
Source source1 = DataGenerator.createSource("servers:9092", "topic");
171186
Source source2 = DataGenerator.createSource("other.servers:9092", "topic");
172187

173-
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "default", "feature1");
174-
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "default", "feature2");
188+
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "default", "fs1");
189+
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "default", "fs2");
175190

176191
when(specService.listFeatureSets(
177192
Filter.newBuilder().setFeatureSetName("features1").setProject("*").build()))
@@ -268,7 +283,7 @@ public void shouldCreateJobPerStore() throws InvalidProtocolBufferException {
268283

269284
Source source = DataGenerator.createSource("servers:9092", "topic");
270285

271-
FeatureSet featureSet = DataGenerator.createFeatureSet(source, "default", "features1");
286+
FeatureSet featureSet = DataGenerator.createFeatureSet(source, "default", "fs1");
272287

273288
when(specService.listFeatureSets(
274289
Filter.newBuilder().setFeatureSetName("*").setProject("*").build()))
@@ -310,7 +325,6 @@ public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferExcepti
310325
Store store2 =
311326
DataGenerator.createStore(
312327
"test-2", Store.StoreType.REDIS, ImmutableList.of(Triple.of("*", "*", false)));
313-
;
314328

315329
Source source = DataGenerator.createSource("servers:9092", "topic");
316330

@@ -333,4 +347,62 @@ public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferExcepti
333347
assertThat(jobTasks, hasSize(1));
334348
assertThat(jobTasks, hasItem(isA(CreateJobTask.class)));
335349
}
350+
351+
@Test
352+
@SneakyThrows
353+
public void shouldSelectOnlyFeatureSetsThatJobManagerSubscribedTo() {
354+
Store store = DataGenerator.getDefaultStore();
355+
Source source = DataGenerator.getDefaultSource();
356+
357+
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source, "default", "fs1");
358+
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source, "project", "fs3");
359+
FeatureSet featureSet3 = DataGenerator.createFeatureSet(source, "default", "not-fs");
360+
361+
when(specService.listFeatureSets(
362+
Filter.newBuilder().setFeatureSetName("*").setProject("*").build()))
363+
.thenReturn(
364+
ListFeatureSetsResponse.newBuilder()
365+
.addAllFeatureSets(Lists.newArrayList(featureSet1, featureSet2, featureSet3))
366+
.build());
367+
368+
List<FeatureSet> featureSetsForStore = jcsWithConsolidation.getFeatureSetsForStore(store);
369+
assertThat(featureSetsForStore, containsInAnyOrder(featureSet1, featureSet2));
370+
}
371+
372+
@Test
373+
@SneakyThrows
374+
public void shouldSelectOnlyStoresThatNotBlacklisted() {
375+
Store store1 =
376+
DataGenerator.createStore(
377+
"normal-store",
378+
Store.StoreType.REDIS,
379+
ImmutableList.of(Triple.of("project1", "*", false)));
380+
Store store2 =
381+
DataGenerator.createStore(
382+
"blacklisted-store",
383+
Store.StoreType.REDIS,
384+
ImmutableList.of(Triple.of("project2", "*", false)));
385+
386+
Source source1 = DataGenerator.createSource("source-1", "topic");
387+
Source source2 = DataGenerator.createSource("source-2", "topic");
388+
389+
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "default", "fs1");
390+
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "project", "fs3");
391+
392+
when(specService.listStores(any()))
393+
.thenReturn(ListStoresResponse.newBuilder().addStore(store1).addStore(store2).build());
394+
395+
when(specService.listFeatureSets(
396+
Filter.newBuilder().setProject("project1").setFeatureSetName("*").build()))
397+
.thenReturn(ListFeatureSetsResponse.newBuilder().addFeatureSets(featureSet1).build());
398+
399+
when(specService.listFeatureSets(
400+
Filter.newBuilder().setProject("project2").setFeatureSetName("*").build()))
401+
.thenReturn(ListFeatureSetsResponse.newBuilder().addFeatureSets(featureSet2).build());
402+
403+
ArrayList<Pair<Source, Set<Store>>> pairs =
404+
Lists.newArrayList(jcsWithConsolidation.getSourceToStoreMappings());
405+
406+
assertThat(pairs, containsInAnyOrder(Pair.of(source1, ImmutableSet.of(store1))));
407+
}
336408
}

infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ feast-core:
2525
jobSelector:
2626
application: feast
2727
tag: $IMAGE_TAG
28+
featureSetSelector:
29+
- project: "*"
30+
name: "*"
31+
whitelisted-stores:
32+
- online
33+
- historical
2834
runners:
2935
- name: dataflow
3036
type: DataflowRunner

0 commit comments

Comments
 (0)