Skip to content

Commit 08a075c

Browse files
authored
Merge pull request influxdata#259 from andyflury/chuncking
InfluxDB-Java Chunking
2 parents b72fb59 + e705baf commit 08a075c

File tree

6 files changed

+196
-12
lines changed

6 files changed

+196
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- Support writing by UDP protocal.
66
- Support gzip compress for http request body.
77
- Support setting thread factory for batch processor.
8+
- Support chunking
89

910
#### Fixes
1011

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,16 @@ public void write(final int udpPort, final Point point);
9696
```
9797
note: make sure write content's total size should not > UDP protocol's limit(64K), or you should use http instead of udp.
9898

99+
100+
#### chunking support (version 2.5+ required):
101+
102+
influxdb-java client now supports influxdb chunking. The following example uses a chunkSize of 20 and invokes the specified Consumer (e.g. System.out.println) for each received QueryResult
103+
```
104+
Query query = new Query("SELECT idle FROM cpu", dbName);
105+
influxDB.query(query, 20, queryResult -> System.out.println(queryResult));
106+
```
107+
108+
99109
### Other Usages:
100110
For additional usage examples have a look at [InfluxDBTest.java](https://github.com/influxdb/influxdb-java/blob/master/src/test/java/org/influxdb/InfluxDBTest.java "InfluxDBTest.java")
101111

src/main/java/org/influxdb/InfluxDB.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.List;
44
import java.util.concurrent.ThreadFactory;
55
import java.util.concurrent.TimeUnit;
6+
import java.util.function.Consumer;
67

78
import org.influxdb.dto.BatchPoints;
89
import org.influxdb.dto.Point;
@@ -217,6 +218,18 @@ public void write(final String database, final String retentionPolicy,
217218
*/
218219
public QueryResult query(final Query query);
219220

221+
/**
222+
* Execute a streaming query against a database.
223+
*
224+
* @param query
225+
* the query to execute.
226+
* @param chunkSize
227+
* the number of QueryResults to process in one chunk.
228+
* @param consumer
229+
* the consumer to invoke for each received QueryResult
230+
*/
231+
public void query(Query query, int chunkSize, Consumer<QueryResult> consumer);
232+
220233
/**
221234
* Execute a query against a database.
222235
*

src/main/java/org/influxdb/impl/InfluxDBImpl.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import com.google.common.base.Stopwatch;
88
import com.google.common.base.Strings;
99
import com.google.common.collect.Lists;
10+
import com.squareup.moshi.JsonAdapter;
11+
import com.squareup.moshi.Moshi;
12+
1013
import org.influxdb.InfluxDB;
1114
import org.influxdb.dto.BatchPoints;
1215
import org.influxdb.dto.Point;
@@ -24,11 +27,14 @@
2427
import okhttp3.ResponseBody;
2528
import okhttp3.logging.HttpLoggingInterceptor;
2629
import okhttp3.logging.HttpLoggingInterceptor.Level;
30+
import okio.BufferedSource;
2731
import retrofit2.Call;
32+
import retrofit2.Callback;
2833
import retrofit2.Response;
2934
import retrofit2.Retrofit;
3035
import retrofit2.converter.moshi.MoshiConverterFactory;
3136

37+
import java.io.EOFException;
3238
import java.io.IOException;
3339
import java.net.DatagramPacket;
3440
import java.net.DatagramSocket;
@@ -41,6 +47,7 @@
4147
import java.util.concurrent.TimeUnit;
4248
import java.util.concurrent.atomic.AtomicBoolean;
4349
import java.util.concurrent.atomic.AtomicLong;
50+
import java.util.function.Consumer;
4451

4552
/**
4653
* Implementation of a InluxDB API.
@@ -66,10 +73,12 @@ public class InfluxDBImpl implements InfluxDB {
6673
private final HttpLoggingInterceptor loggingInterceptor;
6774
private final GzipRequestInterceptor gzipRequestInterceptor;
6875
private LogLevel logLevel = LogLevel.NONE;
76+
private JsonAdapter<QueryResult> adapter;
6977

7078
public InfluxDBImpl(final String url, final String username, final String password,
7179
final OkHttpClient.Builder client) {
7280
super();
81+
Moshi moshi = new Moshi.Builder().build();
7382
this.hostAddress = parseHostAddress(url);
7483
this.username = username;
7584
this.password = password;
@@ -82,6 +91,7 @@ public InfluxDBImpl(final String url, final String username, final String passwo
8291
.addConverterFactory(MoshiConverterFactory.create())
8392
.build();
8493
this.influxDBService = this.retrofit.create(InfluxDBService.class);
94+
this.adapter = moshi.adapter(QueryResult.class);
8595
}
8696

8797
private InetAddress parseHostAddress(final String url) {
@@ -327,6 +337,49 @@ public QueryResult query(final Query query) {
327337
return execute(call);
328338
}
329339

340+
/**
341+
* {@inheritDoc}
342+
*/
343+
@Override
344+
public void query(final Query query, final int chunkSize, final Consumer<QueryResult> consumer) {
345+
346+
if (version().startsWith("0.") || version().startsWith("1.0")) {
347+
throw new RuntimeException("chunking not supported");
348+
}
349+
350+
Call<ResponseBody> call = this.influxDBService.query(this.username, this.password,
351+
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
352+
353+
call.enqueue(new Callback<ResponseBody>() {
354+
@Override
355+
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
356+
try {
357+
if (response.isSuccessful()) {
358+
BufferedSource source = response.body().source();
359+
while (true) {
360+
QueryResult result = InfluxDBImpl.this.adapter.fromJson(source);
361+
if (result != null) {
362+
consumer.accept(result);
363+
}
364+
}
365+
}
366+
try (ResponseBody errorBody = response.errorBody()) {
367+
throw new RuntimeException(errorBody.string());
368+
}
369+
} catch (EOFException e) {
370+
// do nothing
371+
} catch (IOException e) {
372+
throw new RuntimeException(e);
373+
}
374+
}
375+
376+
@Override
377+
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
378+
throw new RuntimeException(t);
379+
}
380+
});
381+
}
382+
330383
/**
331384
* {@inheritDoc}
332385
*/

src/main/java/org/influxdb/impl/InfluxDBService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import retrofit2.http.GET;
1010
import retrofit2.http.POST;
1111
import retrofit2.http.Query;
12+
import retrofit2.http.Streaming;
1213

1314
interface InfluxDBService {
1415

@@ -20,6 +21,7 @@ interface InfluxDBService {
2021
public static final String PRECISION = "precision";
2122
public static final String CONSISTENCY = "consistency";
2223
public static final String EPOCH = "epoch";
24+
public static final String CHUNK_SIZE = "chunk_size";
2325

2426
@GET("/ping")
2527
public Call<ResponseBody> ping();
@@ -61,4 +63,9 @@ public Call<QueryResult> query(@Query(U) String username, @Query(P) String passw
6163
public Call<QueryResult> postQuery(@Query(U) String username,
6264
@Query(P) String password, @Query(value = Q, encoded = true) String query);
6365

66+
@Streaming
67+
@GET("/query?chunked=true")
68+
public Call<ResponseBody> query(@Query(U) String username,
69+
@Query(P) String password, @Query(DB) String db, @Query(value = Q, encoded = true) String query,
70+
@Query(CHUNK_SIZE) int chunkSize);
6471
}

src/test/java/org/influxdb/InfluxDBTest.java

Lines changed: 112 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,29 @@
11
package org.influxdb;
22

3-
import org.influxdb.InfluxDB.LogLevel;
4-
import org.influxdb.dto.*;
5-
import org.influxdb.impl.InfluxDBImpl;
6-
import org.junit.Assert;
7-
import org.junit.Before;
8-
import org.junit.Test;
9-
103
import java.io.IOException;
114
import java.util.ArrayList;
125
import java.util.Arrays;
136
import java.util.List;
147
import java.util.Set;
8+
import java.util.concurrent.BlockingQueue;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.LinkedBlockingQueue;
1511
import java.util.concurrent.ThreadFactory;
1612
import java.util.concurrent.TimeUnit;
13+
import java.util.function.Consumer;
14+
15+
import org.influxdb.InfluxDB.LogLevel;
16+
import org.influxdb.dto.BatchPoints;
17+
import org.influxdb.dto.Point;
18+
import org.influxdb.dto.Pong;
19+
import org.influxdb.dto.Query;
20+
import org.influxdb.dto.QueryResult;
21+
import org.influxdb.impl.InfluxDBImpl;
22+
import org.junit.Assert;
23+
import org.junit.Before;
24+
import org.junit.Rule;
25+
import org.junit.Test;
26+
import org.junit.rules.ExpectedException;
1727

1828
import com.google.common.util.concurrent.Uninterruptibles;
1929

@@ -29,6 +39,7 @@ public class InfluxDBTest {
2939
private final static int UDP_PORT = 8089;
3040
private final static String UDP_DATABASE = "udp";
3141

42+
@Rule public final ExpectedException exception = ExpectedException.none();
3243
/**
3344
* Create a influxDB connection before all tests start.
3445
*
@@ -57,7 +68,7 @@ public void setUp() throws InterruptedException, IOException {
5768
this.influxDB.createDatabase(UDP_DATABASE);
5869
// String logs = CharStreams.toString(new InputStreamReader(containerLogsStream,
5970
// Charsets.UTF_8));
60-
System.out.println("##################################################################################");
71+
System.out.println("################################################################################## ");
6172
// System.out.println("Container Logs: \n" + logs);
6273
System.out.println("# Connected to InfluxDB Version: " + this.influxDB.version() + " #");
6374
System.out.println("##################################################################################");
@@ -219,8 +230,8 @@ public void testWriteStringDataThroughUDP() {
219230
@Test
220231
public void testWriteMultipleStringDataThroughUDP() {
221232
String measurement = TestUtils.getRandomMeasurement();
222-
this.influxDB.write(UDP_PORT, measurement + ",atag=test1 idle=100,usertime=10,system=1\n" +
223-
measurement + ",atag=test2 idle=200,usertime=20,system=2\n" +
233+
this.influxDB.write(UDP_PORT, measurement + ",atag=test1 idle=100,usertime=10,system=1\n" +
234+
measurement + ",atag=test2 idle=200,usertime=20,system=2\n" +
224235
measurement + ",atag=test3 idle=300,usertime=30,system=3");
225236
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
226237
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
@@ -250,12 +261,12 @@ public void testWriteMultipleStringDataLinesThroughUDP() {
250261
Assert.assertEquals(3, result.getResults().get(0).getSeries().size());
251262
Assert.assertEquals(result.getResults().get(0).getSeries().get(0).getTags().get("atag"), "test1");
252263
Assert.assertEquals(result.getResults().get(0).getSeries().get(1).getTags().get("atag"), "test2");
253-
Assert.assertEquals(result.getResults().get(0).getSeries().get(2).getTags().get("atag"), "test3");
264+
Assert.assertEquals(result.getResults().get(0).getSeries().get(2).getTags().get("atag"), "test3");
254265
}
255266

256267
/**
257268
* When batch of points' size is over UDP limit, the expected exception
258-
* is java.lang.RuntimeException: java.net.SocketException:
269+
* is java.lang.RuntimeException: java.net.SocketException:
259270
* The message is larger than the maximum supported by the underlying transport: Datagram send failed
260271
* @throws Exception
261272
*/
@@ -487,4 +498,93 @@ public void testWriteEnableGzipAndDisableGzip() {
487498
}
488499
}
489500

501+
/**
502+
* Test chunking.
503+
* @throws InterruptedException
504+
*/
505+
@Test
506+
public void testChunking() throws InterruptedException {
507+
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
508+
// do not test version 0.13 and 1.0
509+
return;
510+
}
511+
String dbName = "write_unittest_" + System.currentTimeMillis();
512+
this.influxDB.createDatabase(dbName);
513+
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
514+
BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build();
515+
Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build();
516+
Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build();
517+
Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build();
518+
batchPoints.point(point1);
519+
batchPoints.point(point2);
520+
batchPoints.point(point3);
521+
this.influxDB.write(batchPoints);
522+
523+
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
524+
final BlockingQueue<QueryResult> queue = new LinkedBlockingQueue<>();
525+
Query query = new Query("SELECT * FROM disk", dbName);
526+
this.influxDB.query(query, 2, new Consumer<QueryResult>() {
527+
@Override
528+
public void accept(QueryResult result) {
529+
queue.add(result);
530+
}});
531+
532+
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
533+
this.influxDB.deleteDatabase(dbName);
534+
535+
QueryResult result = queue.poll(20, TimeUnit.SECONDS);
536+
Assert.assertNotNull(result);
537+
System.out.println(result);
538+
Assert.assertEquals(2, result.getResults().get(0).getSeries().get(0).getValues().size());
539+
540+
result = queue.poll(20, TimeUnit.SECONDS);
541+
Assert.assertNotNull(result);
542+
System.out.println(result);
543+
Assert.assertEquals(1, result.getResults().get(0).getSeries().get(0).getValues().size());
544+
}
545+
546+
/**
547+
* Test chunking edge case.
548+
* @throws InterruptedException
549+
*/
550+
@Test
551+
public void testChunkingFail() throws InterruptedException {
552+
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
553+
// do not test version 0.13 and 1.0
554+
return;
555+
}
556+
String dbName = "write_unittest_" + System.currentTimeMillis();
557+
this.influxDB.createDatabase(dbName);
558+
final CountDownLatch countDownLatch = new CountDownLatch(1);
559+
Query query = new Query("XXX", dbName);
560+
this.influxDB.query(query, 10, new Consumer<QueryResult>() {
561+
@Override
562+
public void accept(QueryResult result) {
563+
countDownLatch.countDown();
564+
}
565+
});
566+
this.influxDB.deleteDatabase(dbName);
567+
Assert.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS));
568+
}
569+
570+
/**
571+
* Test chunking on 0.13 and 1.0.
572+
* @throws InterruptedException
573+
*/
574+
@Test()
575+
public void testChunkingOldVersion() throws InterruptedException {
576+
577+
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
578+
579+
this.exception.expect(RuntimeException.class);
580+
String dbName = "write_unittest_" + System.currentTimeMillis();
581+
Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName);
582+
this.influxDB.query(query, 10, new Consumer<QueryResult>() {
583+
@Override
584+
public void accept(QueryResult result) {
585+
}
586+
});
587+
}
588+
}
589+
490590
}

0 commit comments

Comments
 (0)