Skip to content

Commit 96aebde

Browse files
author
Lavkesh Lahngir
committed
Replacing Jedis With Lettuce in ingestion and serving
1 parent 6363540 commit 96aebde

File tree

18 files changed

+416
-157
lines changed

18 files changed

+416
-157
lines changed

ingestion/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@
216216
</dependency>
217217

218218
<dependency>
219-
<groupId>redis.clients</groupId>
220-
<artifactId>jedis</artifactId>
219+
<groupId>io.lettuce</groupId>
220+
<artifactId>lettuce-core</artifactId>
221221
</dependency>
222222

223223
<dependency>

ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,15 @@
4343
import feast.core.StoreProto.Store.RedisConfig;
4444
import feast.core.StoreProto.Store.StoreType;
4545
import feast.types.ValueProto.ValueType.Enum;
46+
import io.lettuce.core.RedisClient;
47+
import io.lettuce.core.RedisConnectionException;
48+
import io.lettuce.core.RedisURI;
4649
import java.util.ArrayList;
4750
import java.util.HashMap;
4851
import java.util.List;
4952
import java.util.Map;
5053
import org.apache.commons.lang3.tuple.Pair;
5154
import org.slf4j.Logger;
52-
import redis.clients.jedis.JedisPool;
53-
import redis.clients.jedis.exceptions.JedisConnectionException;
5455

5556
// TODO: Create partitioned table by default
5657

@@ -239,15 +240,16 @@ public static void setupBigQuery(
239240
* @param redisConfig Plase refer to feast.core.Store proto
240241
*/
241242
public static void checkRedisConnection(RedisConfig redisConfig) {
242-
JedisPool jedisPool = new JedisPool(redisConfig.getHost(), redisConfig.getPort());
243+
RedisClient redisClient =
244+
RedisClient.create(RedisURI.create(redisConfig.getHost(), redisConfig.getPort()));
243245
try {
244-
jedisPool.getResource();
245-
} catch (JedisConnectionException e) {
246+
redisClient.connect();
247+
} catch (RedisConnectionException e) {
246248
throw new RuntimeException(
247249
String.format(
248250
"Failed to connect to Redis at host: '%s' port: '%d'. Please check that your Redis is running and accessible from Feast.",
249251
redisConfig.getHost(), redisConfig.getPort()));
250252
}
251-
jedisPool.close();
253+
redisClient.shutdown();
252254
}
253255
}

ingestion/src/main/java/feast/retry/Retriable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package feast.retry;
1818

1919
public interface Retriable {
20-
void execute();
20+
void execute() throws Exception;
2121

2222
Boolean isExceptionRetriable(Exception e);
2323

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.store.serving.redis;
18+
19+
import io.lettuce.core.RedisClient;
20+
import io.lettuce.core.RedisFuture;
21+
import io.lettuce.core.TransactionResult;
22+
import io.lettuce.core.api.StatefulRedisConnection;
23+
import io.lettuce.core.api.async.RedisAsyncCommands;
24+
import io.lettuce.core.codec.ByteArrayCodec;
25+
import java.util.concurrent.ExecutionException;
26+
27+
public class LettuceTransactionPipeline {
28+
29+
private StatefulRedisConnection<byte[], byte[]> connection;
30+
private RedisAsyncCommands<byte[], byte[]> commands;
31+
32+
protected LettuceTransactionPipeline(RedisClient redisClient) {
33+
connection = redisClient.connect(new ByteArrayCodec());
34+
this.commands = connection.async();
35+
}
36+
37+
public void clear() {
38+
this.commands.discard();
39+
}
40+
41+
RedisFuture pexpire(byte[] k, long duration) {
42+
return commands.pexpire(k, duration);
43+
}
44+
45+
void exec() throws ExecutionException, InterruptedException {
46+
RedisFuture<TransactionResult> exec = commands.exec();
47+
exec.get();
48+
}
49+
50+
RedisFuture multi() {
51+
return this.commands.multi();
52+
}
53+
54+
RedisFuture append(byte[] k, byte[] v) {
55+
return this.commands.append(k, v);
56+
}
57+
58+
RedisFuture set(byte[] k, byte[] v) {
59+
return this.commands.set(k, v);
60+
}
61+
62+
RedisFuture lpush(byte[] k, byte[] v) {
63+
return this.commands.lpush(k, v);
64+
}
65+
66+
RedisFuture rpush(byte[] k, byte[] v) {
67+
return this.commands.rpush(k, v);
68+
}
69+
70+
RedisFuture sadd(byte[] k, byte[] v) {
71+
return this.commands.sadd(k, v);
72+
}
73+
74+
RedisFuture zadd(byte[] k, long s, byte[] v) {
75+
return this.commands.zadd(k, s, v);
76+
}
77+
78+
void close() {
79+
this.clear();
80+
this.connection.close();
81+
}
82+
}

ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@
2020
import feast.ingestion.values.FailedElement;
2121
import feast.retry.BackOffExecutor;
2222
import feast.retry.Retriable;
23+
import io.lettuce.core.RedisClient;
24+
import io.lettuce.core.RedisConnectionException;
25+
import io.lettuce.core.RedisFuture;
26+
import io.lettuce.core.RedisURI;
2327
import java.io.IOException;
2428
import java.util.ArrayList;
2529
import java.util.List;
30+
import java.util.concurrent.ExecutionException;
2631
import org.apache.avro.reflect.Nullable;
2732
import org.apache.beam.sdk.coders.AvroCoder;
2833
import org.apache.beam.sdk.coders.DefaultCoder;
@@ -36,10 +41,6 @@
3641
import org.joda.time.Instant;
3742
import org.slf4j.Logger;
3843
import org.slf4j.LoggerFactory;
39-
import redis.clients.jedis.Jedis;
40-
import redis.clients.jedis.Pipeline;
41-
import redis.clients.jedis.Response;
42-
import redis.clients.jedis.exceptions.JedisConnectionException;
4344

4445
public class RedisCustomIO {
4546

@@ -194,10 +195,10 @@ public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {
194195
private final BackOffExecutor backOffExecutor;
195196
private final List<RedisMutation> mutations = new ArrayList<>();
196197

197-
private Jedis jedis;
198-
private Pipeline pipeline;
198+
private LettuceTransactionPipeline pipeline;
199199
private int batchSize = DEFAULT_BATCH_SIZE;
200200
private int timeout = DEFAULT_TIMEOUT;
201+
private RedisClient redisclient;
201202

202203
WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
203204
this.host = redisConfig.getHost();
@@ -224,20 +225,29 @@ public WriteDoFn withTimeout(int timeout) {
224225

225226
@Setup
226227
public void setup() {
227-
jedis = new Jedis(host, port, timeout);
228+
this.redisclient =
229+
RedisClient.create(new RedisURI(host, port, java.time.Duration.ofMillis(timeout)));
228230
}
229231

230232
@StartBundle
231233
public void startBundle() {
234+
try {
235+
pipeline = new LettuceTransactionPipeline(redisclient);
236+
} catch (RedisConnectionException e) {
237+
log.error("Connection to redis cannot be established ", e);
238+
}
232239
mutations.clear();
233-
pipeline = jedis.pipelined();
234240
}
235241

236242
private void executeBatch() throws Exception {
237243
backOffExecutor.execute(
238244
new Retriable() {
239245
@Override
240-
public void execute() {
246+
public void execute() throws ExecutionException, InterruptedException {
247+
if (pipeline == null) {
248+
pipeline = new LettuceTransactionPipeline(redisclient);
249+
}
250+
pipeline.clear();
241251
pipeline.multi();
242252
mutations.forEach(
243253
mutation -> {
@@ -247,24 +257,22 @@ public void execute() {
247257
}
248258
});
249259
pipeline.exec();
250-
pipeline.sync();
260+
pipeline.clear();
251261
mutations.clear();
252262
}
253263

254264
@Override
255265
public Boolean isExceptionRetriable(Exception e) {
256-
return e instanceof JedisConnectionException;
266+
return e instanceof RedisConnectionException
267+
|| e instanceof ExecutionException
268+
|| e instanceof InterruptedException;
257269
}
258270

259271
@Override
260272
public void cleanUpAfterFailure() {
261-
try {
262-
pipeline.close();
263-
} catch (IOException e) {
264-
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
273+
if (pipeline != null) {
274+
pipeline.clear();
265275
}
266-
jedis = new Jedis(host, port, timeout);
267-
pipeline = jedis.pipelined();
268276
}
269277
});
270278
}
@@ -299,7 +307,7 @@ public void processElement(ProcessContext context) {
299307
}
300308
}
301309

302-
private Response<?> writeRecord(RedisMutation mutation) {
310+
private RedisFuture<?> writeRecord(RedisMutation mutation) {
303311
switch (mutation.getMethod()) {
304312
case APPEND:
305313
return pipeline.append(mutation.getKey(), mutation.getValue());
@@ -339,7 +347,7 @@ public void finishBundle(FinishBundleContext context)
339347

340348
@Teardown
341349
public void teardown() {
342-
jedis.close();
350+
redisclient.shutdown();
343351
}
344352
}
345353
}

ingestion/src/test/java/feast/ingestion/ImportJobTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
import feast.test.TestUtil.LocalRedis;
3939
import feast.types.FeatureRowProto.FeatureRow;
4040
import feast.types.ValueProto.ValueType.Enum;
41+
import io.lettuce.core.RedisClient;
42+
import io.lettuce.core.RedisURI;
43+
import io.lettuce.core.api.StatefulRedisConnection;
44+
import io.lettuce.core.api.sync.RedisCommands;
45+
import io.lettuce.core.codec.ByteArrayCodec;
4146
import java.io.IOException;
4247
import java.nio.charset.StandardCharsets;
4348
import java.util.ArrayList;
@@ -57,7 +62,6 @@
5762
import org.junit.Test;
5863
import org.slf4j.Logger;
5964
import org.slf4j.LoggerFactory;
60-
import redis.clients.jedis.Jedis;
6165

6266
public class ImportJobTest {
6367

@@ -206,21 +210,24 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
206210
Duration.standardSeconds(IMPORT_JOB_CHECK_INTERVAL_DURATION_SEC));
207211

208212
LOGGER.info("Validating the actual values written to Redis ...");
209-
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
213+
RedisClient redisClient =
214+
RedisClient.create(new RedisURI(REDIS_HOST, REDIS_PORT, java.time.Duration.ofMillis(2000)));
215+
StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(new ByteArrayCodec());
216+
RedisCommands<byte[], byte[]> sync = connection.sync();
210217
expected.forEach(
211218
(key, expectedValue) -> {
212219

213220
// Ensure ingested key exists.
214-
byte[] actualByteValue = jedis.get(key.toByteArray());
221+
byte[] actualByteValue = sync.get(key.toByteArray());
215222
if (actualByteValue == null) {
216223
LOGGER.error("Key not found in Redis: " + key);
217224
LOGGER.info("Redis INFO:");
218-
LOGGER.info(jedis.info());
219-
String randomKey = jedis.randomKey();
225+
LOGGER.info(sync.info());
226+
byte[] randomKey = sync.randomkey();
220227
if (randomKey != null) {
221228
LOGGER.info("Sample random key, value (for debugging purpose):");
222229
LOGGER.info("Key: " + randomKey);
223-
LOGGER.info("Value: " + jedis.get(randomKey));
230+
LOGGER.info("Value: " + sync.get(randomKey));
224231
}
225232
Assert.fail("Missing key in Redis.");
226233
}
@@ -239,5 +246,6 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
239246
// Ensure the retrieved FeatureRow is equal to the ingested FeatureRow.
240247
Assert.assertEquals(expectedValue, actualValue);
241248
});
249+
redisClient.shutdown();
242250
}
243251
}

ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
import feast.store.serving.redis.RedisCustomIO.RedisMutation;
2727
import feast.types.FeatureRowProto.FeatureRow;
2828
import feast.types.ValueProto.ValueType.Enum;
29+
import io.lettuce.core.RedisClient;
30+
import io.lettuce.core.RedisURI;
31+
import io.lettuce.core.api.StatefulRedisConnection;
32+
import io.lettuce.core.api.sync.RedisStringCommands;
33+
import io.lettuce.core.codec.ByteArrayCodec;
2934
import java.io.IOException;
3035
import java.util.HashMap;
3136
import java.util.LinkedHashMap;
@@ -43,7 +48,6 @@
4348
import org.junit.Before;
4449
import org.junit.Rule;
4550
import org.junit.Test;
46-
import redis.clients.jedis.Jedis;
4751
import redis.embedded.Redis;
4852
import redis.embedded.RedisServer;
4953

@@ -53,17 +57,22 @@ public class RedisCustomIOTest {
5357
private static String REDIS_HOST = "localhost";
5458
private static int REDIS_PORT = 51234;
5559
private Redis redis;
56-
private Jedis jedis;
60+
private RedisClient redisClient;
61+
private RedisStringCommands<byte[], byte[]> sync;
5762

5863
@Before
5964
public void setUp() throws IOException {
6065
redis = new RedisServer(REDIS_PORT);
6166
redis.start();
62-
jedis = new Jedis(REDIS_HOST, REDIS_PORT);
67+
redisClient =
68+
RedisClient.create(new RedisURI(REDIS_HOST, REDIS_PORT, java.time.Duration.ofMillis(2000)));
69+
StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(new ByteArrayCodec());
70+
sync = connection.sync();
6371
}
6472

6573
@After
6674
public void teardown() {
75+
redisClient.shutdown();
6776
redis.stop();
6877
}
6978

@@ -110,7 +119,7 @@ public void shouldWriteToRedis() {
110119

111120
kvs.forEach(
112121
(key, value) -> {
113-
byte[] actual = jedis.get(key.toByteArray());
122+
byte[] actual = sync.get(key.toByteArray());
114123
assertThat(actual, equalTo(value.toByteArray()));
115124
});
116125
}
@@ -169,7 +178,7 @@ public void shouldRetryFailConnection() throws InterruptedException {
169178

170179
kvs.forEach(
171180
(key, value) -> {
172-
byte[] actual = jedis.get(key.toByteArray());
181+
byte[] actual = sync.get(key.toByteArray());
173182
assertThat(actual, equalTo(value.toByteArray()));
174183
});
175184
}

serving/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@
138138
<version>3.1.0</version>
139139
</dependency>
140140

141-
<!--compile 'redis.clients:jedis:2.9.0'-->
142141
<dependency>
143-
<groupId>redis.clients</groupId>
144-
<artifactId>jedis</artifactId>
142+
<groupId>io.lettuce</groupId>
143+
<artifactId>lettuce-core</artifactId>
145144
</dependency>
145+
146146
<!--compile 'com.google.guava:guava:26.0-jre'-->
147147
<dependency>
148148
<groupId>com.google.guava</groupId>

0 commit comments

Comments
 (0)