Skip to content

Commit bbfa765

Browse files
pradithyaPradithya Aria
authored andcommitted
Preload spec in serving cache (feast-dev#152)
* Preload spec in serving cache * Disable async refresh and only periodically refresh cache
1 parent ca2ee77 commit bbfa765

File tree

5 files changed

+160
-66
lines changed

5 files changed

+160
-66
lines changed

serving/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,13 @@
232232
<version>2.23.0</version>
233233
<scope>test</scope>
234234
</dependency>
235+
236+
<dependency>
237+
<groupId>com.google.guava</groupId>
238+
<artifactId>guava-testlib</artifactId>
239+
<version>26.0-jre</version>
240+
<scope>test</scope>
241+
</dependency>
235242
</dependencies>
236243

237244

serving/src/main/java/feast/serving/config/ServingApiConfiguration.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.Map;
3131
import java.util.concurrent.ExecutorService;
3232
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.TimeUnit;
3335
import lombok.extern.slf4j.Slf4j;
3436
import org.springframework.beans.factory.annotation.Autowired;
3537
import org.springframework.beans.factory.annotation.Value;
@@ -39,15 +41,12 @@
3941
import org.springframework.http.converter.protobuf.ProtobufJsonFormatHttpMessageConverter;
4042
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
4143

42-
/**
43-
* Global bean configuration.
44-
*/
44+
/** Global bean configuration. */
4545
@Slf4j
4646
@Configuration
4747
public class ServingApiConfiguration implements WebMvcConfigurer {
4848

49-
@Autowired
50-
private ProtobufJsonFormatHttpMessageConverter protobufConverter;
49+
@Autowired private ProtobufJsonFormatHttpMessageConverter protobufConverter;
5150

5251
@Bean
5352
public AppConfig getAppConfig(
@@ -66,9 +65,24 @@ public AppConfig getAppConfig(
6665
@Bean
6766
public SpecStorage getCoreServiceSpecStorage(
6867
@Value("${feast.core.host}") String coreServiceHost,
69-
@Value("${feast.core.grpc.port}") String coreServicePort) {
70-
return new CachedSpecStorage(
71-
new CoreService(coreServiceHost, Integer.parseInt(coreServicePort)));
68+
@Value("${feast.core.grpc.port}") String coreServicePort,
69+
@Value("${feast.cacheDurationMinute}") int cacheDurationMinute) {
70+
ScheduledExecutorService scheduledExecutorService =
71+
Executors.newSingleThreadScheduledExecutor();
72+
final CachedSpecStorage cachedSpecStorage =
73+
new CachedSpecStorage(new CoreService(coreServiceHost, Integer.parseInt(coreServicePort)));
74+
75+
// reload all specs including new ones periodically
76+
scheduledExecutorService.schedule(
77+
() -> cachedSpecStorage.populateCache(), cacheDurationMinute, TimeUnit.MINUTES);
78+
79+
// load all specs during start up
80+
try {
81+
cachedSpecStorage.populateCache();
82+
} catch (Exception e) {
83+
log.error("Unable to preload feast's spec");
84+
}
85+
return cachedSpecStorage;
7286
}
7387

7488
@Bean

serving/src/main/java/feast/serving/service/CachedSpecStorage.java

Lines changed: 24 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@
2020
import com.google.common.cache.CacheBuilder;
2121
import com.google.common.cache.CacheLoader;
2222
import com.google.common.cache.LoadingCache;
23-
import lombok.extern.slf4j.Slf4j;
2423
import feast.serving.exception.SpecRetrievalException;
2524
import feast.specs.EntitySpecProto.EntitySpec;
2625
import feast.specs.FeatureSpecProto.FeatureSpec;
2726
import feast.specs.StorageSpecProto.StorageSpec;
28-
29-
import java.time.Duration;
3027
import java.util.Collections;
3128
import java.util.Map;
29+
import lombok.extern.slf4j.Slf4j;
3230

3331
/** SpecStorage implementation with built-in in-memory cache. */
3432
@Slf4j
3533
public class CachedSpecStorage implements SpecStorage {
34+
private static final int MAX_SPEC_COUNT = 10000;
35+
3636
private final CoreService coreService;
3737
private final LoadingCache<String, EntitySpec> entitySpecCache;
3838
private final CacheLoader<String, EntitySpec> entitySpecLoader;
@@ -41,70 +41,24 @@ public class CachedSpecStorage implements SpecStorage {
4141
private final LoadingCache<String, StorageSpec> storageSpecCache;
4242
private final CacheLoader<String, StorageSpec> storageSpecLoader;
4343

44-
private static final Duration CACHE_DURATION;
45-
private static final int MAX_SPEC_COUNT = 1000;
46-
47-
static {
48-
CACHE_DURATION = Duration.ofMinutes(30);
49-
}
50-
5144
public CachedSpecStorage(CoreService coreService) {
5245
this.coreService = coreService;
5346
entitySpecLoader =
54-
new CacheLoader<String, EntitySpec>() {
55-
@Override
56-
public EntitySpec load(String key) throws Exception {
57-
return coreService.getEntitySpecs(Collections.singletonList(key)).get(key);
58-
}
59-
60-
@Override
61-
public Map<String, EntitySpec> loadAll(Iterable<? extends String> keys) throws Exception {
62-
return coreService.getEntitySpecs((Iterable<String>) keys);
63-
}
64-
};
65-
entitySpecCache =
66-
CacheBuilder.newBuilder()
67-
.maximumSize(MAX_SPEC_COUNT)
68-
.expireAfterAccess(CACHE_DURATION)
69-
.build(entitySpecLoader);
47+
CacheLoader.from(
48+
(String key) -> coreService.getEntitySpecs(Collections.singletonList(key)).get(key));
49+
entitySpecCache = CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(entitySpecLoader);
7050

7151
featureSpecLoader =
72-
new CacheLoader<String, FeatureSpec>() {
73-
@Override
74-
public FeatureSpec load(String key) throws Exception {
75-
return coreService.getFeatureSpecs(Collections.singletonList(key)).get(key);
76-
}
77-
78-
@Override
79-
public Map<String, FeatureSpec> loadAll(Iterable<? extends String> keys)
80-
throws Exception {
81-
return coreService.getFeatureSpecs((Iterable<String>) keys);
82-
}
83-
};
52+
CacheLoader.from(
53+
(String key) -> coreService.getFeatureSpecs(Collections.singletonList(key)).get(key));
8454
featureSpecCache =
85-
CacheBuilder.newBuilder()
86-
.maximumSize(MAX_SPEC_COUNT)
87-
.expireAfterAccess(CACHE_DURATION)
88-
.build(featureSpecLoader);
55+
CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureSpecLoader);
8956

9057
storageSpecLoader =
91-
new CacheLoader<String, StorageSpec>() {
92-
@Override
93-
public Map<String, StorageSpec> loadAll(Iterable<? extends String> keys)
94-
throws Exception {
95-
return coreService.getStorageSpecs((Iterable<String>) keys);
96-
}
97-
98-
@Override
99-
public StorageSpec load(String key) throws Exception {
100-
return coreService.getStorageSpecs(Collections.singleton(key)).get(key);
101-
}
102-
};
58+
CacheLoader.from(
59+
(String key) -> coreService.getStorageSpecs(Collections.singletonList(key)).get(key));
10360
storageSpecCache =
104-
CacheBuilder.newBuilder()
105-
.maximumSize(MAX_SPEC_COUNT)
106-
.expireAfterAccess(CACHE_DURATION)
107-
.build(storageSpecLoader);
61+
CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(storageSpecLoader);
10862
}
10963

11064
@Override
@@ -177,4 +131,16 @@ public Map<String, StorageSpec> getAllStorageSpecs() {
177131
public boolean isConnected() {
178132
return coreService.isConnected();
179133
}
134+
135+
/** Preload all spec into cache. */
136+
public void populateCache() {
137+
Map<String, FeatureSpec> featureSpecMap = coreService.getAllFeatureSpecs();
138+
featureSpecCache.putAll(featureSpecMap);
139+
140+
Map<String, EntitySpec> entitySpecMap = coreService.getAllEntitySpecs();
141+
entitySpecCache.putAll(entitySpecMap);
142+
143+
Map<String, StorageSpec> storageSpecMap = coreService.getAllStorageSpecs();
144+
storageSpecCache.putAll(storageSpecMap);
145+
}
180146
}

serving/src/main/resources/application.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ feast.maxentity=${FEAST_MAX_ENTITY_PER_BATCH:2000}
2424
feast.timeout=${FEAST_RETRIEVAL_TIMEOUT:5}
2525
feast.redispool.maxsize=${FEAST_REDIS_POOL_MAX_SIZE:128}
2626
feast.redispool.maxidle=${FEAST_REDIS_POOL_MAX_IDLE:16}
27+
feast.cacheDurationMinute=${FEAST_SPEC_CACHE_DURATION_MINUTE:5}
2728

2829
statsd.host= ${STATSD_HOST:localhost}
2930
statsd.port= ${STATSD_PORT:8125}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package feast.serving.service;
2+
3+
import static org.hamcrest.Matchers.equalTo;
4+
import static org.junit.Assert.assertNotNull;
5+
import static org.junit.Assert.assertThat;
6+
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.Mockito.mock;
8+
import static org.mockito.Mockito.times;
9+
import static org.mockito.Mockito.verify;
10+
import static org.mockito.Mockito.when;
11+
12+
import com.google.common.testing.FakeTicker;
13+
import feast.specs.EntitySpecProto.EntitySpec;
14+
import feast.specs.FeatureSpecProto.FeatureSpec;
15+
import feast.specs.StorageSpecProto.StorageSpec;
16+
import java.util.Collections;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import java.util.concurrent.TimeUnit;
20+
import org.junit.Before;
21+
import org.junit.Test;
22+
23+
public class CachedSpecStorageTest {
24+
25+
private CoreService coreService;
26+
private CachedSpecStorage cachedSpecStorage;
27+
28+
@Before
29+
public void setUp() throws Exception {
30+
coreService = mock(CoreService.class);
31+
cachedSpecStorage = new CachedSpecStorage(coreService);
32+
}
33+
34+
@Test
35+
public void testPopulateCache() {
36+
Map<String, FeatureSpec> featureSpecMap = new HashMap<>();
37+
featureSpecMap.put("feature_1", mock(FeatureSpec.class));
38+
39+
Map<String, StorageSpec> storageSpecMap = new HashMap<>();
40+
storageSpecMap.put("storage_1", mock(StorageSpec.class));
41+
42+
Map<String, EntitySpec> entitySpecMap = new HashMap<>();
43+
entitySpecMap.put("entity_1", mock(EntitySpec.class));
44+
45+
when(coreService.getAllFeatureSpecs()).thenReturn(featureSpecMap);
46+
when(coreService.getAllEntitySpecs()).thenReturn(entitySpecMap);
47+
when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap);
48+
49+
cachedSpecStorage.populateCache();
50+
Map<String, FeatureSpec> result =
51+
cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1"));
52+
Map<String, StorageSpec> result1 =
53+
cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1"));
54+
Map<String, EntitySpec> result2 =
55+
cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1"));
56+
57+
assertThat(result.size(), equalTo(1));
58+
assertThat(result1.size(), equalTo(1));
59+
assertThat(result2.size(), equalTo(1));
60+
61+
verify(coreService, times(0)).getFeatureSpecs(any(Iterable.class));
62+
verify(coreService, times(0)).getStorageSpecs(any(Iterable.class));
63+
verify(coreService, times(0)).getEntitySpecs(any(Iterable.class));
64+
}
65+
66+
@Test
67+
public void reloadFailureShouldReturnOldValue() {
68+
Map<String, FeatureSpec> featureSpecMap = new HashMap<>();
69+
featureSpecMap.put("feature_1", mock(FeatureSpec.class));
70+
71+
Map<String, StorageSpec> storageSpecMap = new HashMap<>();
72+
storageSpecMap.put("storage_1", mock(StorageSpec.class));
73+
74+
Map<String, EntitySpec> entitySpecMap = new HashMap<>();
75+
entitySpecMap.put("entity_1", mock(EntitySpec.class));
76+
77+
when(coreService.getAllFeatureSpecs()).thenReturn(featureSpecMap);
78+
when(coreService.getFeatureSpecs(any(Iterable.class))).thenThrow(new RuntimeException("error"));
79+
when(coreService.getAllEntitySpecs()).thenReturn(entitySpecMap);
80+
when(coreService.getEntitySpecs(any(Iterable.class))).thenThrow(new RuntimeException("error"));
81+
when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap);
82+
when(coreService.getStorageSpecs(any(Iterable.class))).thenThrow(new RuntimeException("error"));
83+
84+
cachedSpecStorage.populateCache();
85+
Map<String, FeatureSpec> result =
86+
cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1"));
87+
Map<String, StorageSpec> result1 =
88+
cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1"));
89+
Map<String, EntitySpec> result2 =
90+
cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1"));
91+
92+
assertThat(result.size(), equalTo(1));
93+
assertThat(result1.size(), equalTo(1));
94+
assertThat(result2.size(), equalTo(1));
95+
verify(coreService, times(0)).getFeatureSpecs(any(Iterable.class));
96+
verify(coreService, times(0)).getStorageSpecs(any(Iterable.class));
97+
verify(coreService, times(0)).getEntitySpecs(any(Iterable.class));
98+
99+
result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1"));
100+
result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1"));
101+
result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1"));
102+
assertThat(result.size(), equalTo(1));
103+
assertThat(result1.size(), equalTo(1));
104+
assertThat(result2.size(), equalTo(1));
105+
}
106+
}

0 commit comments

Comments
 (0)