@@ -484,7 +484,100 @@ protected void check(InvocationOnMock invocation) {
484484 spy .deleteDatabase (dbName );
485485 }
486486 }
487-
487+
488+
489+ @ Test
490+ public void testWriteWithRetryOnRecoverableError () throws InterruptedException {
491+ String dbName = "write_unittest_" + System .currentTimeMillis ();
492+ InfluxDB spy = spy (influxDB );
493+ doAnswer (new Answer () {
494+ boolean firstCall = true ;
495+
496+ @ Override
497+ public Object answer (InvocationOnMock invocation ) throws Throwable {
498+ if (firstCall ) {
499+ firstCall = false ;
500+ throw new InfluxDBException ("error" );
501+ } else {
502+ return invocation .callRealMethod ();
503+ }
504+ }
505+ }).when (spy ).write (any (BatchPoints .class ));
506+ try {
507+ BiConsumer <Iterable <Point >, Throwable > mockHandler = mock (BiConsumer .class );
508+ BatchOptions options = BatchOptions .DEFAULTS .exceptionHandler (mockHandler ).flushDuration (100 );
509+
510+ spy .createDatabase (dbName );
511+ spy .setDatabase (dbName );
512+ spy .enableBatch (options );
513+
514+ BatchPoints batchPoints = createBatchPoints (dbName , "m0" , 200 );
515+ spy .writeWithRetry (batchPoints );
516+ Thread .sleep (500 );
517+ verify (mockHandler , never ()).accept (any (), any ());
518+
519+ verify (spy , times (2 )).write (any (BatchPoints .class ));
520+
521+ QueryResult result = influxDB .query (new Query ("select * from m0" , dbName ));
522+ Assertions .assertNotNull (result .getResults ().get (0 ).getSeries ());
523+ Assertions .assertEquals (200 , result .getResults ().get (0 ).getSeries ().get (0 ).getValues ().size ());
524+
525+ } finally {
526+ spy .disableBatch ();
527+ spy .deleteDatabase (dbName );
528+ }
529+ }
530+
531+ @ Test
532+ public void testWriteWithRetryOnUnrecoverableError () throws InterruptedException {
533+
534+ String dbName = "write_unittest_" + System .currentTimeMillis ();
535+ InfluxDB spy = spy ((InfluxDB ) influxDB );
536+ doThrow (DatabaseNotFoundException .class ).when (spy ).write (any (BatchPoints .class ));
537+
538+ try {
539+ BiConsumer <Iterable <Point >, Throwable > mockHandler = mock (BiConsumer .class );
540+ BatchOptions options = BatchOptions .DEFAULTS .exceptionHandler (mockHandler ).flushDuration (100 );
541+
542+ spy .createDatabase (dbName );
543+ spy .setDatabase (dbName );
544+ spy .enableBatch (options );
545+
546+ BatchPoints batchPoints = createBatchPoints (dbName , "m0" , 200 );
547+ spy .writeWithRetry (batchPoints );
548+ Thread .sleep (500 );
549+
550+ verify (mockHandler , times (1 )).accept (any (), any ());
551+
552+ QueryResult result = influxDB .query (new Query ("select * from m0" , dbName ));
553+ Assertions .assertNull (result .getResults ().get (0 ).getSeries ());
554+ Assertions .assertNull (result .getResults ().get (0 ).getError ());
555+ } finally {
556+ spy .disableBatch ();
557+ spy .deleteDatabase (dbName );
558+ }
559+
560+ }
561+
562+ @ Test
563+ public void testWriteWithRetryOnBatchingNotEnabled () {
564+ String dbName = "write_unittest_" + System .currentTimeMillis ();
565+ try {
566+
567+ influxDB .createDatabase (dbName );
568+ influxDB .setDatabase (dbName );
569+
570+ BatchPoints batchPoints = createBatchPoints (dbName , "m0" , 200 );
571+ influxDB .writeWithRetry (batchPoints );
572+
573+ QueryResult result = influxDB .query (new Query ("select * from m0" , dbName ));
574+ Assertions .assertNotNull (result .getResults ().get (0 ).getSeries ());
575+ Assertions .assertEquals (200 , result .getResults ().get (0 ).getSeries ().get (0 ).getValues ().size ());
576+ } finally {
577+ influxDB .deleteDatabase (dbName );
578+ }
579+
580+ }
488581 void writeSomePoints (InfluxDB influxDB , String measurement , int firstIndex , int lastIndex ) {
489582 for (int i = firstIndex ; i <= lastIndex ; i ++) {
490583 Point point = Point .measurement (measurement )
@@ -514,7 +607,21 @@ void write20Points(InfluxDB influxDB) {
514607 void writeSomePoints (InfluxDB influxDB , int n ) {
515608 writeSomePoints (influxDB , 0 , n - 1 );
516609 }
517-
610+
611+ private BatchPoints createBatchPoints (String dbName , String measurement , int n ) {
612+ BatchPoints batchPoints = BatchPoints .database (dbName ).build ();
613+ for (int i = 1 ; i <= n ; i ++) {
614+ Point point = Point .measurement (measurement )
615+ .time (i ,TimeUnit .MILLISECONDS )
616+ .addField ("f1" , (double ) i )
617+ .addField ("f2" , (double ) (i ) * 1.1 )
618+ .addField ("f3" , "f_v3" ).build ();
619+ batchPoints .point (point );
620+ }
621+
622+ return batchPoints ;
623+ }
624+
518625 static String createErrorBody (String errorMessage ) {
519626 return MessageFormat .format ("'{' \" error\" : \" {0}\" '}'" , errorMessage );
520627 }
0 commit comments