Skip to content

Commit 63d40cc

Browse files
authored
add connection options (feast-dev#1356)
Signed-off-by: Oleksii Moskalenko <[email protected]> add native netty transport as dependency Signed-off-by: Oleksii Moskalenko <[email protected]> format Signed-off-by: Oleksii Moskalenko <[email protected]> configurable timeout Signed-off-by: Oleksii Moskalenko <[email protected]> cleanup Signed-off-by: Oleksii Moskalenko <[email protected]>
1 parent 37554ae commit 63d40cc

File tree

5 files changed

+46
-7
lines changed

5 files changed

+46
-7
lines changed

protos/feast/core/Store.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ option java_package = "feast.proto.core";
2121
option java_outer_classname = "StoreProto";
2222
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
2323

24+
import "google/protobuf/duration.proto";
25+
2426
// Store provides a location where Feast reads and writes feature values.
2527
// Feature values will be written to the Store in the form of FeatureRow elements.
2628
// The way FeatureRow is encoded and decoded when it is written to and read from
@@ -86,6 +88,8 @@ message Store {
8688
REPLICA_PREFERRED = 3;
8789
}
8890
ReadFrom read_from = 8;
91+
// Optional. Timeout on waiting response from redis node
92+
google.protobuf.Duration timeout = 9;
8993
}
9094

9195
message Subscription {

serving/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,13 @@
8989
<artifactId>feast-common</artifactId>
9090
<version>${project.version}</version>
9191
</dependency>
92-
92+
93+
<dependency>
94+
<groupId>io.lettuce</groupId>
95+
<artifactId>lettuce-core</artifactId>
96+
<version>6.0.2.RELEASE</version>
97+
</dependency>
98+
9399
<!-- TODO: SLF4J is being used via Lombok, but also jog4j - pick one -->
94100
<dependency>
95101
<groupId>org.slf4j</groupId>
@@ -197,6 +203,7 @@
197203
<artifactId>simpleclient_servlet</artifactId>
198204
<version>0.8.0</version>
199205
</dependency>
206+
200207
<dependency>
201208
<groupId>io.prometheus</groupId>
202209
<artifactId>simpleclient_spring_boot</artifactId>

serving/src/main/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ feast:
5656
# Connection string specifies the host:port of Redis instances in the redis cluster.
5757
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
5858
read_from: MASTER
59+
timeout: 0.5s
5960
subscriptions:
6061
- name: "*"
6162
project: "*"

storage/connectors/redis/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616
<dependency>
1717
<groupId>io.lettuce</groupId>
1818
<artifactId>lettuce-core</artifactId>
19+
<version>6.0.2.RELEASE</version>
20+
</dependency>
21+
22+
<dependency>
23+
<groupId>io.netty</groupId>
24+
<artifactId>netty-transport-native-epoll</artifactId>
25+
<version>4.1.52.Final</version>
26+
<classifier>linux-x86_64</classifier>
1927
</dependency>
2028

2129
<dependency>

storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import feast.proto.core.StoreProto.Store.RedisClusterConfig;
2222
import feast.storage.connectors.redis.serializer.RedisKeyPrefixSerializerV2;
2323
import feast.storage.connectors.redis.serializer.RedisKeySerializerV2;
24-
import io.lettuce.core.KeyValue;
25-
import io.lettuce.core.ReadFrom;
26-
import io.lettuce.core.RedisFuture;
27-
import io.lettuce.core.RedisURI;
24+
import io.lettuce.core.*;
25+
import io.lettuce.core.cluster.ClusterClientOptions;
26+
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
2827
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
2928
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
3029
import io.lettuce.core.codec.ByteArrayCodec;
30+
import java.time.Duration;
3131
import java.util.Arrays;
3232
import java.util.List;
3333
import java.util.Map;
@@ -100,9 +100,28 @@ public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig conf
100100
return RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1]));
101101
})
102102
.collect(Collectors.toList());
103+
io.lettuce.core.cluster.RedisClusterClient client =
104+
io.lettuce.core.cluster.RedisClusterClient.create(redisURIList);
105+
106+
Duration timeout;
107+
if (config.hasTimeout()) {
108+
timeout =
109+
Duration.ofSeconds(config.getTimeout().getSeconds(), config.getTimeout().getNanos());
110+
} else {
111+
timeout = Duration.ofSeconds(10);
112+
}
113+
114+
client.setOptions(
115+
ClusterClientOptions.builder()
116+
.socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build())
117+
.timeoutOptions(TimeoutOptions.enabled(timeout))
118+
.pingBeforeActivateConnection(true)
119+
.topologyRefreshOptions(
120+
ClusterTopologyRefreshOptions.builder().enableAllAdaptiveRefreshTriggers().build())
121+
.build());
122+
103123
StatefulRedisClusterConnection<byte[], byte[]> connection =
104-
io.lettuce.core.cluster.RedisClusterClient.create(redisURIList)
105-
.connect(new ByteArrayCodec());
124+
client.connect(new ByteArrayCodec());
106125

107126
connection.setReadFrom(PROTO_TO_LETTUCE_TYPES.get(config.getReadFrom()));
108127

0 commit comments

Comments
 (0)