Skip to content

Commit b02191f

Browse files
authored
Restart Ingestion Job on code version update (feast-dev#949)
* blacklist -> whitelist * add whitelisted property * coordinator properties in e2e tests * move labels to job & jobcoordinator * it test for version update * version label constant * fix version label
1 parent 52c17a3 commit b02191f

File tree

8 files changed

+91
-20
lines changed

8 files changed

+91
-20
lines changed

core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) {
4343
}
4444

4545
@Override
46-
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
46+
public Job getOrCreateJob(
47+
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels) {
4748
return jobRepository
4849
.findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
4950
source, null, JobStatus.getTerminalStates())
@@ -55,6 +56,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> store
5556
.setStores(
5657
stores.stream()
5758
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
59+
.setLabels(labels)
5860
.build());
5961
}
6062

core/src/main/java/feast/core/job/JobGroupingStrategy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import feast.core.model.Job;
2020
import feast.proto.core.SourceProto;
2121
import feast.proto.core.StoreProto;
22+
import java.util.Map;
2223
import java.util.Set;
2324
import java.util.stream.Stream;
2425
import org.apache.commons.lang3.tuple.Pair;
@@ -29,7 +30,8 @@
2930
*/
3031
public interface JobGroupingStrategy {
3132
/** Get the non terminated ingestion job ingesting for given source and stores. */
32-
Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores);
33+
Job getOrCreateJob(
34+
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels);
3335
/** Create unique JobId that would be used as key in communications with JobRunner */
3436
String createJobId(Job job);
3537
/* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/

core/src/main/java/feast/core/job/JobPerStoreStrategy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import feast.proto.core.StoreProto;
2424
import java.time.Instant;
2525
import java.util.ArrayList;
26+
import java.util.Map;
2627
import java.util.Objects;
2728
import java.util.Set;
2829
import java.util.stream.Collectors;
@@ -42,7 +43,8 @@ public JobPerStoreStrategy(JobRepository jobRepository) {
4243
}
4344

4445
@Override
45-
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
46+
public Job getOrCreateJob(
47+
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels) {
4648
ArrayList<StoreProto.Store> storesList = Lists.newArrayList(stores);
4749
if (storesList.size() != 1) {
4850
throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy");
@@ -60,6 +62,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> store
6062
.setStores(
6163
stores.stream()
6264
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
65+
.setLabels(labels)
6366
.build());
6467
}
6568

core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,6 @@ public static DataflowJobManager of(
9494
Map<String, String> jobSelector,
9595
Dataflow dataflow) {
9696

97-
// Retrieve labels to extend them with jobSelector
98-
Map<String, String> jobLabels = new HashMap<>(runnerConfigOptions.getLabelsMap());
99-
// Merge Job Selector Labels into runner options
100-
jobSelector.forEach(jobLabels::put);
101-
runnerConfigOptions = runnerConfigOptions.toBuilder().putAllLabels(jobLabels).build();
102-
10397
defaultOptions = new DataflowRunnerConfig(runnerConfigOptions);
10498
this.dataflow = dataflow;
10599
this.metrics = metricsProperties;
@@ -130,7 +124,11 @@ public Job startJob(Job job) {
130124
try {
131125
String extId =
132126
submitDataflowJob(
133-
job.getId(), job.getSource(), new HashSet<>(job.getStores().values()), false);
127+
job.getId(),
128+
job.getSource(),
129+
new HashSet<>(job.getStores().values()),
130+
job.getLabels(),
131+
false);
134132
job.setExtId(extId);
135133
return job;
136134

@@ -315,9 +313,13 @@ public List<Job> listRunningJobs() {
315313
}
316314

317315
private String submitDataflowJob(
318-
String jobName, SourceProto.Source source, Set<StoreProto.Store> sinks, boolean update) {
316+
String jobName,
317+
SourceProto.Source source,
318+
Set<StoreProto.Store> sinks,
319+
Map<String, String> labels,
320+
boolean update) {
319321
try {
320-
ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, update);
322+
ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, labels, update);
321323
DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions);
322324
String jobId = waitForJobToRun(pipelineResult);
323325
return jobId;
@@ -328,7 +330,11 @@ private String submitDataflowJob(
328330
}
329331

330332
private ImportOptions getPipelineOptions(
331-
String jobName, SourceProto.Source source, Set<StoreProto.Store> sinks, boolean update)
333+
String jobName,
334+
SourceProto.Source source,
335+
Set<StoreProto.Store> sinks,
336+
Map<String, String> labels,
337+
boolean update)
332338
throws IOException, IllegalAccessException {
333339
ImportOptions pipelineOptions =
334340
PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);
@@ -347,6 +353,12 @@ private ImportOptions getPipelineOptions(
347353
pipelineOptions.setJobName(jobName);
348354
pipelineOptions.setFilesToStage(
349355
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader()));
356+
357+
// Merge common labels with job's labels
358+
Map<String, String> mergedLabels = new HashMap<>(defaultOptions.getLabels());
359+
labels.forEach(mergedLabels::put);
360+
pipelineOptions.setLabels(mergedLabels);
361+
350362
if (metrics.isEnabled()) {
351363
pipelineOptions.setMetricsExporterType(metrics.getType());
352364
if (metrics.getType().equals("statsd")) {

core/src/main/java/feast/core/model/Job.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,14 @@ public JobStatus getStatus() {
5353
public abstract Map<FeatureSetReference, FeatureSetDeliveryStatus>
5454
getFeatureSetDeliveryStatuses();
5555

56+
// Job's labels
57+
public abstract Map<String, String> getLabels();
58+
5659
public static Builder builder() {
5760
return new AutoValue_Job.Builder()
5861
.setFeatureSetDeliveryStatuses(new HashMap<>())
59-
.setStores(new HashMap<>());
62+
.setStores(new HashMap<>())
63+
.setLabels(new HashMap<>());
6064
}
6165

6266
@AutoValue.Builder
@@ -70,6 +74,8 @@ public interface Builder {
7074
Builder setFeatureSetDeliveryStatuses(
7175
Map<FeatureSetReference, FeatureSetDeliveryStatus> statuses);
7276

77+
Builder setLabels(Map<String, String> labels);
78+
7379
Job build();
7480
}
7581

@@ -164,12 +170,13 @@ public IngestionJobProto.IngestionJob toProto() {
164170
return ingestJob;
165171
}
166172

167-
public Job cloneWithId(String newJobId) {
173+
public Job cloneWithIdAndLabels(String newJobId, Map<String, String> labels) {
168174
return Job.builder()
169175
.setSource(this.getSource())
170176
.setFeatureSetDeliveryStatuses(new HashMap<>(this.getFeatureSetDeliveryStatuses()))
171177
.setStores(new HashMap<>(this.getStores()))
172178
.setId(newJobId)
179+
.setLabels(labels)
173180
.build();
174181
}
175182

core/src/main/java/feast/core/service/JobCoordinatorService.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
public class JobCoordinatorService {
6060

6161
private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5;
62+
public static final String VERSION_LABEL = "feast_version";
6263

6364
private final JobRepository jobRepository;
6465
private final SpecService specService;
@@ -68,6 +69,8 @@ public class JobCoordinatorService {
6869
private final KafkaTemplate<String, FeatureSetSpec> specPublisher;
6970
private final List<Store.Subscription> featureSetSubscriptions;
7071
private final List<String> whitelistedStores;
72+
private final Map<String, String> jobLabels;
73+
private final String currentVersion;
7174

7275
@Autowired
7376
public JobCoordinatorService(
@@ -88,6 +91,9 @@ public JobCoordinatorService(
8891
.map(JobProperties.CoordinatorProperties.FeatureSetSelector::toSubscription)
8992
.collect(Collectors.toList());
9093
this.whitelistedStores = feastProperties.getJobs().getCoordinator().getWhitelistedStores();
94+
this.currentVersion = feastProperties.getVersion();
95+
this.jobLabels = new HashMap<>(feastProperties.getJobs().getCoordinator().getJobSelector());
96+
this.jobLabels.put(VERSION_LABEL, this.currentVersion);
9197
}
9298

9399
/**
@@ -161,7 +167,7 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
161167
Source source = mapping.getKey();
162168
Set<Store> stores = mapping.getValue();
163169

164-
Job job = groupingStrategy.getOrCreateJob(source, stores);
170+
Job job = groupingStrategy.getOrCreateJob(source, stores, this.jobLabels);
165171

166172
if (job.isDeployed()) {
167173
if (!job.isRunning()) {
@@ -177,7 +183,7 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
177183
// it would make sense to spawn clone of current job
178184
// and terminate old version on the next Poll.
179185
// Both jobs should be in the same consumer group and not conflict with each other
180-
job = job.cloneWithId(groupingStrategy.createJobId(job));
186+
job = job.cloneWithIdAndLabels(groupingStrategy.createJobId(job), this.jobLabels);
181187
job.addAllStores(stores);
182188

183189
isSafeToStopJobs = false;
@@ -214,8 +220,9 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
214220
/**
215221
* Decides whether we need to upgrade (restart) given job. Since we send updated FeatureSets to
216222
* IngestionJob via Kafka, and there's only one source per job (if it change - new job would be
217-
* created) the only things that can cause upgrade here are stores: new stores can be added, or
218-
* existing stores will change subscriptions.
223+
* created) main trigger that can cause upgrade here are stores: new stores can be added, or
224+
* existing stores will change subscriptions. Another trigger is release of new version: current
225+
* version is being compared with job's version stored in labels.
219226
*
220227
* @param job {@link Job} to check
221228
* @param stores Set of {@link Store} new version of stores (vs current version job.getStores())
@@ -227,6 +234,10 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
227234
return true;
228235
}
229236

237+
if (!this.currentVersion.equals(job.getLabels().get(VERSION_LABEL))) {
238+
return true;
239+
}
240+
230241
return false;
231242
}
232243

@@ -257,7 +268,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {
257268

258269
// Add featureSet to allocated job if not allocated before
259270
for (Pair<Source, Set<Store>> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) {
260-
Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight());
271+
Job job =
272+
groupingStrategy.getOrCreateJob(
273+
jobArgs.getLeft(), jobArgs.getRight(), Collections.emptyMap());
261274
if (!job.isRunning()) {
262275
continue;
263276
}

core/src/test/java/feast/core/service/JobCoordinatorIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
2323
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
2424
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
25+
import static org.hamcrest.collection.IsMapContaining.hasEntry;
2526
import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
2627
import static org.hamcrest.core.AllOf.allOf;
2728

@@ -60,6 +61,7 @@
6061
"feast.jobs.coordinator.feature-set-selector[0].project=default",
6162
"feast.jobs.coordinator.whitelisted-stores[0]=test-store",
6263
"feast.jobs.coordinator.whitelisted-stores[1]=new-store",
64+
"feast.version=1.0.0"
6365
})
6466
public class JobCoordinatorIT extends BaseIT {
6567
@Autowired private FakeJobManager jobManager;
@@ -188,6 +190,33 @@ public void shouldNotCreateJobForUnwantedFeatureSet() {
188190
assertThat(jobManager.getAllJobs(), hasSize(0));
189191
}
190192

193+
@Test
194+
@SneakyThrows
195+
public void shouldRestartJobWithOldVersion() {
196+
apiClient.simpleApplyFeatureSet(
197+
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"));
198+
199+
Job job =
200+
Job.builder()
201+
.setSource(DataGenerator.getDefaultSource())
202+
.setStores(
203+
ImmutableMap.of(
204+
DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore()))
205+
.setId("some-running-id")
206+
.setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "0.9.9"))
207+
.build();
208+
209+
jobManager.startJob(job);
210+
jobRepository.add(job);
211+
212+
await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED));
213+
214+
Job replacement = jobRepository.findByStatus(JobStatus.RUNNING).get(0);
215+
assertThat(replacement.getSource(), equalTo(job.getSource()));
216+
assertThat(replacement.getStores(), equalTo(job.getStores()));
217+
assertThat(replacement.getLabels(), hasEntry(JobCoordinatorService.VERSION_LABEL, "1.0.0"));
218+
}
219+
191220
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
192221
@Nested
193222
class SpecNotificationFlow extends SequentialFlow {
@@ -212,6 +241,7 @@ public void shouldSendNewSpec() {
212241
ImmutableMap.of(
213242
DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore()))
214243
.setId("some-running-id")
244+
.setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "1.0.0"))
215245
.build();
216246

217247
jobManager.startJob(job);

core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,11 @@ public void setUp() {
8585
coordinatorProperties.setFeatureSetSelector(ImmutableList.of(selector));
8686
coordinatorProperties.setWhitelistedStores(
8787
ImmutableList.of("test-store", "test", "test-1", "test-2", "normal-store"));
88+
coordinatorProperties.setJobSelector(ImmutableMap.of("application", "feast"));
8889

8990
jobProperties.setCoordinator(coordinatorProperties);
9091
feastProperties.setJobs(jobProperties);
92+
feastProperties.setVersion("1.0.0");
9193

9294
TestUtil.setupAuditLogger();
9395

0 commit comments

Comments
 (0)