Skip to content

Commit 286941d

Browse files
authored
Make nodes priority (for redis cluster) configurable in Serving (feast-dev#1260)
Signed-off-by: Oleksii Moskalenko <[email protected]>
1 parent 1c4ba12 commit 286941d

File tree

5 files changed

+38
-15
lines changed

5 files changed

+38
-15
lines changed

protos/feast/core/Store.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ message Store {
7878
// Optional. This would be the fallback prefix to use if enable_fallback is true.
7979
string fallback_prefix = 7;
8080

81+
// Optional. Priority of nodes when reading from cluster
82+
enum ReadFrom {
83+
MASTER = 0;
84+
MASTER_PREFERRED = 1;
85+
REPLICA = 2;
86+
REPLICA_PREFERRED = 3;
87+
}
88+
ReadFrom read_from = 8;
8189
}
8290

8391
message Subscription {

serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import feast.storage.api.retriever.OnlineRetrieverV2;
2626
import feast.storage.connectors.redis.retriever.*;
2727
import io.opentracing.Tracer;
28-
import java.util.Map;
2928
import org.slf4j.Logger;
3029
import org.springframework.context.annotation.Bean;
3130
import org.springframework.context.annotation.Configuration;
@@ -41,16 +40,16 @@ public ServingServiceV2 servingServiceV2(
4140
ServingServiceV2 servingService = null;
4241
FeastProperties.Store store = feastProperties.getActiveStore();
4342
StoreProto.Store.StoreType storeType = store.toProto().getType();
44-
Map<String, String> config = store.getConfig();
4543

4644
switch (storeType) {
4745
case REDIS_CLUSTER:
48-
RedisClientAdapter redisClusterClient = RedisClusterClient.create(config);
46+
RedisClientAdapter redisClusterClient =
47+
RedisClusterClient.create(store.toProto().getRedisClusterConfig());
4948
OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient);
5049
servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer);
5150
break;
5251
case REDIS:
53-
RedisClientAdapter redisClient = RedisClient.create(config);
52+
RedisClientAdapter redisClient = RedisClient.create(store.toProto().getRedisConfig());
5453
OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient);
5554
servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer);
5655
break;

serving/src/main/resources/application.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ feast:
5454
type: REDIS_CLUSTER
5555
config: # Store specific configuration.
5656
# Connection string specifies the host:port of Redis instances in the redis cluster.
57-
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
57+
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
58+
read_from: MASTER
5859
subscriptions:
5960
- name: "*"
6061
project: "*"

storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
*/
1717
package feast.storage.connectors.redis.retriever;
1818

19+
import feast.proto.core.StoreProto;
1920
import io.lettuce.core.KeyValue;
2021
import io.lettuce.core.RedisFuture;
2122
import io.lettuce.core.RedisURI;
2223
import io.lettuce.core.api.StatefulRedisConnection;
2324
import io.lettuce.core.api.async.RedisAsyncCommands;
2425
import io.lettuce.core.codec.ByteArrayCodec;
2526
import java.util.List;
26-
import java.util.Map;
2727

2828
public class RedisClient implements RedisClientAdapter {
2929

@@ -46,11 +46,11 @@ private RedisClient(StatefulRedisConnection<byte[], byte[]> connection) {
4646
this.asyncCommands.setAutoFlushCommands(false);
4747
}
4848

49-
public static RedisClientAdapter create(Map<String, String> config) {
49+
public static RedisClientAdapter create(StoreProto.Store.RedisConfig config) {
5050

51-
RedisURI uri = RedisURI.create(config.get("host"), Integer.parseInt(config.get("port")));
51+
RedisURI uri = RedisURI.create(config.getHost(), config.getPort());
5252

53-
if (Boolean.parseBoolean(config.get("ssl"))) {
53+
if (config.getSsl()) {
5454
uri.setSsl(true);
5555
}
5656
StatefulRedisConnection<byte[], byte[]> connection =

storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
*/
1717
package feast.storage.connectors.redis.retriever;
1818

19+
import com.google.common.collect.ImmutableMap;
20+
import feast.proto.core.StoreProto;
21+
import feast.proto.core.StoreProto.Store.RedisClusterConfig;
1922
import feast.storage.connectors.redis.serializer.RedisKeyPrefixSerializerV2;
2023
import feast.storage.connectors.redis.serializer.RedisKeySerializerV2;
2124
import io.lettuce.core.KeyValue;
25+
import io.lettuce.core.ReadFrom;
2226
import io.lettuce.core.RedisFuture;
2327
import io.lettuce.core.RedisURI;
2428
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
@@ -36,6 +40,13 @@ public class RedisClusterClient implements RedisClientAdapter {
3640
private final RedisKeySerializerV2 serializer;
3741
@Nullable private final RedisKeySerializerV2 fallbackSerializer;
3842

43+
private static final Map<RedisClusterConfig.ReadFrom, ReadFrom> PROTO_TO_LETTUCE_TYPES =
44+
ImmutableMap.of(
45+
RedisClusterConfig.ReadFrom.MASTER, ReadFrom.MASTER,
46+
RedisClusterConfig.ReadFrom.MASTER_PREFERRED, ReadFrom.MASTER_PREFERRED,
47+
RedisClusterConfig.ReadFrom.REPLICA, ReadFrom.REPLICA,
48+
RedisClusterConfig.ReadFrom.REPLICA_PREFERRED, ReadFrom.REPLICA_PREFERRED);
49+
3950
@Override
4051
public RedisFuture<List<KeyValue<byte[], byte[]>>> hmget(byte[] key, byte[]... fields) {
4152
return asyncCommands.hmget(key, fields);
@@ -73,13 +84,16 @@ private RedisClusterClient(Builder builder) {
7384
this.serializer = builder.serializer;
7485
this.fallbackSerializer = builder.fallbackSerializer;
7586

87+
// allows reading from replicas
88+
this.asyncCommands.readOnly();
89+
7690
// Disable auto-flushing
7791
this.asyncCommands.setAutoFlushCommands(false);
7892
}
7993

80-
public static RedisClientAdapter create(Map<String, String> config) {
94+
public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig config) {
8195
List<RedisURI> redisURIList =
82-
Arrays.stream(config.get("connection_string").split(","))
96+
Arrays.stream(config.getConnectionString().split(","))
8397
.map(
8498
hostPort -> {
8599
String[] hostPortSplit = hostPort.trim().split(":");
@@ -90,14 +104,15 @@ public static RedisClientAdapter create(Map<String, String> config) {
90104
io.lettuce.core.cluster.RedisClusterClient.create(redisURIList)
91105
.connect(new ByteArrayCodec());
92106

93-
RedisKeySerializerV2 serializer =
94-
new RedisKeyPrefixSerializerV2(config.getOrDefault("key_prefix", ""));
107+
connection.setReadFrom(PROTO_TO_LETTUCE_TYPES.get(config.getReadFrom()));
108+
109+
RedisKeySerializerV2 serializer = new RedisKeyPrefixSerializerV2(config.getKeyPrefix());
95110

96111
Builder builder = new Builder(connection, serializer);
97112

98-
if (Boolean.parseBoolean(config.getOrDefault("enable_fallback", "false"))) {
113+
if (config.getEnableFallback()) {
99114
RedisKeySerializerV2 fallbackSerializer =
100-
new RedisKeyPrefixSerializerV2(config.getOrDefault("fallback_prefix", ""));
115+
new RedisKeyPrefixSerializerV2(config.getKeyPrefix());
101116
builder = builder.withFallbackSerializer(fallbackSerializer);
102117
}
103118

0 commit comments

Comments
 (0)