1818
1919import feast .core .StoreProto ;
2020import feast .ingestion .values .FailedElement ;
21- import feast .retry .BackOffExecutor ;
2221import feast .retry .Retriable ;
23- import io .lettuce .core .RedisClient ;
2422import io .lettuce .core .RedisConnectionException ;
25- import io .lettuce .core .RedisFuture ;
26- import io .lettuce .core .RedisURI ;
2723import java .io .IOException ;
2824import java .util .ArrayList ;
25+ import java .util .Arrays ;
2926import java .util .List ;
3027import java .util .concurrent .ExecutionException ;
3128import org .apache .avro .reflect .Nullable ;
3734import org .apache .beam .sdk .transforms .windowing .GlobalWindow ;
3835import org .apache .beam .sdk .values .PCollection ;
3936import org .apache .commons .lang3 .exception .ExceptionUtils ;
40- import org .joda .time .Duration ;
4137import org .joda .time .Instant ;
4238import org .slf4j .Logger ;
4339import org .slf4j .LoggerFactory ;
@@ -51,8 +47,8 @@ public class RedisCustomIO {
5147
5248 private RedisCustomIO () {}
5349
54- public static Write write (StoreProto .Store . RedisConfig redisConfig ) {
55- return new Write (redisConfig );
50+ public static Write write (StoreProto .Store store ) {
51+ return new Write (store );
5652 }
5753
5854 public enum Method {
@@ -169,8 +165,8 @@ public static class Write
169165
170166 private WriteDoFn dofn ;
171167
172- private Write (StoreProto .Store . RedisConfig redisConfig ) {
173- this .dofn = new WriteDoFn (redisConfig );
168+ private Write (StoreProto .Store store ) {
169+ this .dofn = new WriteDoFn (store );
174170 }
175171
176172 public Write withBatchSize (int batchSize ) {
@@ -190,23 +186,14 @@ public PCollection<FailedElement> expand(PCollection<RedisMutation> input) {
190186
191187 public static class WriteDoFn extends DoFn <RedisMutation , FailedElement > {
192188
193- private final String host ;
194- private final int port ;
195- private final BackOffExecutor backOffExecutor ;
196189 private final List <RedisMutation > mutations = new ArrayList <>();
197-
198- private LettuceTransactionPipeline pipeline ;
199190 private int batchSize = DEFAULT_BATCH_SIZE ;
200191 private int timeout = DEFAULT_TIMEOUT ;
201- private RedisClient redisclient ;
202-
203- WriteDoFn (StoreProto .Store .RedisConfig redisConfig ) {
204- this .host = redisConfig .getHost ();
205- this .port = redisConfig .getPort ();
206- long backoffMs =
207- redisConfig .getInitialBackoffMs () > 0 ? redisConfig .getInitialBackoffMs () : 1 ;
208- this .backOffExecutor =
209- new BackOffExecutor (redisConfig .getMaxRetries (), Duration .millis (backoffMs ));
192+ private RedisIngestionClient ingestionClient ;
193+
194+ WriteDoFn (StoreProto .Store store ) {
195+ if (store .getType () == StoreProto .Store .StoreType .REDIS )
196+ this .ingestionClient = new RedisStandaloneIngestionClient (store .getRedisConfig ());
210197 }
211198
212199 public WriteDoFn withBatchSize (int batchSize ) {
@@ -225,64 +212,57 @@ public WriteDoFn withTimeout(int timeout) {
225212
226213 @ Setup
227214 public void setup () {
228- this .redisclient =
229- RedisClient .create (new RedisURI (host , port , java .time .Duration .ofMillis (timeout )));
215+ this .ingestionClient .setup ();
230216 }
231217
232218 @ StartBundle
233219 public void startBundle () {
234220 try {
235- pipeline = new LettuceTransactionPipeline ( redisclient );
221+ ingestionClient . connect ( );
236222 } catch (RedisConnectionException e ) {
237223 log .error ("Connection to redis cannot be established " , e );
238224 }
239225 mutations .clear ();
240226 }
241227
242228 private void executeBatch () throws Exception {
243- backOffExecutor .execute (
244- new Retriable () {
245- @ Override
246- public void execute () throws ExecutionException , InterruptedException {
247- if (pipeline == null ) {
248- pipeline = new LettuceTransactionPipeline (redisclient );
249- }
250- pipeline .clear ();
251- pipeline .multi ();
252- mutations .forEach (
253- mutation -> {
254- writeRecord (mutation );
255- if (mutation .getExpiryMillis () != null && mutation .getExpiryMillis () > 0 ) {
256- pipeline .pexpire (mutation .getKey (), mutation .getExpiryMillis ());
257- }
258- });
259- pipeline .exec ();
260- pipeline .clear ();
261- mutations .clear ();
262- }
263-
264- @ Override
265- public Boolean isExceptionRetriable (Exception e ) {
266- return e instanceof RedisConnectionException
267- || e instanceof ExecutionException
268- || e instanceof InterruptedException ;
269- }
270-
271- @ Override
272- public void cleanUpAfterFailure () {
273- if (pipeline != null ) {
274- pipeline .clear ();
275- }
276- }
277- });
229+ this .ingestionClient
230+ .getBackOffExecutor ()
231+ .execute (
232+ new Retriable () {
233+ @ Override
234+ public void execute () throws ExecutionException , InterruptedException {
235+ if (!ingestionClient .isConnected ()) {
236+ ingestionClient .connect ();
237+ }
238+ mutations .forEach (
239+ mutation -> {
240+ writeRecord (mutation );
241+ if (mutation .getExpiryMillis () != null
242+ && mutation .getExpiryMillis () > 0 ) {
243+ ingestionClient .pexpire (mutation .getKey (), mutation .getExpiryMillis ());
244+ }
245+ });
246+ ingestionClient .sync ();
247+ mutations .clear ();
248+ }
249+
250+ @ Override
251+ public Boolean isExceptionRetriable (Exception e ) {
252+ return e instanceof RedisConnectionException ;
253+ }
254+
255+ @ Override
256+ public void cleanUpAfterFailure () {}
257+ });
278258 }
279259
280260 private FailedElement toFailedElement (
281261 RedisMutation mutation , Exception exception , String jobName ) {
282262 return FailedElement .newBuilder ()
283263 .setJobName (jobName )
284264 .setTransformName ("RedisCustomIO" )
285- .setPayload (mutation .getValue (). toString ( ))
265+ .setPayload (Arrays . toString ( mutation .getValue ()))
286266 .setErrorMessage (exception .getMessage ())
287267 .setStackTrace (ExceptionUtils .getStackTrace (exception ))
288268 .build ();
@@ -307,20 +287,26 @@ public void processElement(ProcessContext context) {
307287 }
308288 }
309289
310- private RedisFuture <?> writeRecord (RedisMutation mutation ) {
290+ private void writeRecord (RedisMutation mutation ) {
311291 switch (mutation .getMethod ()) {
312292 case APPEND :
313- return pipeline .append (mutation .getKey (), mutation .getValue ());
293+ ingestionClient .append (mutation .getKey (), mutation .getValue ());
294+ return ;
314295 case SET :
315- return pipeline .set (mutation .getKey (), mutation .getValue ());
296+ ingestionClient .set (mutation .getKey (), mutation .getValue ());
297+ return ;
316298 case LPUSH :
317- return pipeline .lpush (mutation .getKey (), mutation .getValue ());
299+ ingestionClient .lpush (mutation .getKey (), mutation .getValue ());
300+ return ;
318301 case RPUSH :
319- return pipeline .rpush (mutation .getKey (), mutation .getValue ());
302+ ingestionClient .rpush (mutation .getKey (), mutation .getValue ());
303+ return ;
320304 case SADD :
321- return pipeline .sadd (mutation .getKey (), mutation .getValue ());
305+ ingestionClient .sadd (mutation .getKey (), mutation .getValue ());
306+ return ;
322307 case ZADD :
323- return pipeline .zadd (mutation .getKey (), mutation .getScore (), mutation .getValue ());
308+ ingestionClient .zadd (mutation .getKey (), mutation .getScore (), mutation .getValue ());
309+ return ;
324310 default :
325311 throw new UnsupportedOperationException (
326312 String .format ("Not implemented writing records for %s" , mutation .getMethod ()));
@@ -347,7 +333,7 @@ public void finishBundle(FinishBundleContext context)
347333
348334 @ Teardown
349335 public void teardown () {
350- redisclient .shutdown ();
336+ ingestionClient .shutdown ();
351337 }
352338 }
353339 }
0 commit comments