Skip to content

Commit 8953a7a

Browse files
author
Lavkesh Lahngir
committed
Check the connection before connecting as lettuce does the retry automatically
1 parent 6a7fb0c commit 8953a7a

File tree

2 files changed

+21
-18
lines changed

2 files changed

+21
-18
lines changed

ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,11 @@ public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {
189189
private final List<RedisMutation> mutations = new ArrayList<>();
190190
private int batchSize = DEFAULT_BATCH_SIZE;
191191
private int timeout = DEFAULT_TIMEOUT;
192-
private RedisIngestionClient ingestionClient;
192+
private RedisIngestionClient redisIngestionClient;
193193

194194
WriteDoFn(StoreProto.Store store) {
195195
if (store.getType() == StoreProto.Store.StoreType.REDIS)
196-
this.ingestionClient = new RedisStandaloneIngestionClient(store.getRedisConfig());
196+
this.redisIngestionClient = new RedisStandaloneIngestionClient(store.getRedisConfig());
197197
}
198198

199199
public WriteDoFn withBatchSize(int batchSize) {
@@ -212,38 +212,38 @@ public WriteDoFn withTimeout(int timeout) {
212212

213213
@Setup
214214
public void setup() {
215-
this.ingestionClient.setup();
215+
this.redisIngestionClient.setup();
216216
}
217217

218218
@StartBundle
219219
public void startBundle() {
220220
try {
221-
ingestionClient.connect();
221+
redisIngestionClient.connect();
222222
} catch (RedisConnectionException e) {
223223
log.error("Connection to redis cannot be established ", e);
224224
}
225225
mutations.clear();
226226
}
227227

228228
private void executeBatch() throws Exception {
229-
this.ingestionClient
229+
this.redisIngestionClient
230230
.getBackOffExecutor()
231231
.execute(
232232
new Retriable() {
233233
@Override
234234
public void execute() throws ExecutionException, InterruptedException {
235-
if (!ingestionClient.isConnected()) {
236-
ingestionClient.connect();
235+
if (!redisIngestionClient.isConnected()) {
236+
redisIngestionClient.connect();
237237
}
238238
mutations.forEach(
239239
mutation -> {
240240
writeRecord(mutation);
241241
if (mutation.getExpiryMillis() != null
242242
&& mutation.getExpiryMillis() > 0) {
243-
ingestionClient.pexpire(mutation.getKey(), mutation.getExpiryMillis());
243+
redisIngestionClient.pexpire(mutation.getKey(), mutation.getExpiryMillis());
244244
}
245245
});
246-
ingestionClient.sync();
246+
redisIngestionClient.sync();
247247
mutations.clear();
248248
}
249249

@@ -290,22 +290,22 @@ public void processElement(ProcessContext context) {
290290
private void writeRecord(RedisMutation mutation) {
291291
switch (mutation.getMethod()) {
292292
case APPEND:
293-
ingestionClient.append(mutation.getKey(), mutation.getValue());
293+
redisIngestionClient.append(mutation.getKey(), mutation.getValue());
294294
return;
295295
case SET:
296-
ingestionClient.set(mutation.getKey(), mutation.getValue());
296+
redisIngestionClient.set(mutation.getKey(), mutation.getValue());
297297
return;
298298
case LPUSH:
299-
ingestionClient.lpush(mutation.getKey(), mutation.getValue());
299+
redisIngestionClient.lpush(mutation.getKey(), mutation.getValue());
300300
return;
301301
case RPUSH:
302-
ingestionClient.rpush(mutation.getKey(), mutation.getValue());
302+
redisIngestionClient.rpush(mutation.getKey(), mutation.getValue());
303303
return;
304304
case SADD:
305-
ingestionClient.sadd(mutation.getKey(), mutation.getValue());
305+
redisIngestionClient.sadd(mutation.getKey(), mutation.getValue());
306306
return;
307307
case ZADD:
308-
ingestionClient.zadd(mutation.getKey(), mutation.getScore(), mutation.getValue());
308+
redisIngestionClient.zadd(mutation.getKey(), mutation.getScore(), mutation.getValue());
309309
return;
310310
default:
311311
throw new UnsupportedOperationException(
@@ -333,7 +333,7 @@ public void finishBundle(FinishBundleContext context)
333333

334334
@Teardown
335335
public void teardown() {
336-
ingestionClient.shutdown();
336+
redisIngestionClient.shutdown();
337337
}
338338
}
339339
}

ingestion/src/main/java/feast/store/serving/redis/RedisStandaloneIngestionClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ public void shutdown() {
6363

6464
@Override
6565
public void connect() {
66-
this.connection = this.redisclient.connect(new ByteArrayCodec());
67-
this.commands = connection.async();
66+
if (!isConnected()) {
67+
this.connection = this.redisclient.connect(new ByteArrayCodec());
68+
this.commands = connection.async();
69+
}
6870
}
6971

7072
@Override
@@ -75,6 +77,7 @@ public boolean isConnected() {
7577
@Override
7678
public void sync() {
7779
// Wait for some time for futures to complete
80+
// TODO: should this be configurable?
7881
LettuceFutures.awaitAll(60, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));
7982
futures.clear();
8083
}

0 commit comments

Comments
 (0)