Skip to content

Commit d25dda4

Browse files
authored
Merge pull request taosdata#22544 from taosdata/feat/TS-3701
feat(driver): add committed assignment API for jdbc
2 parents 04a13d1 + ed70cd1 commit d25dda4

File tree

2 files changed

+122
-1
lines changed

2 files changed

+122
-1
lines changed

source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm
9292
*/
9393
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong);
9494

95+
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong);
96+
97+
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring,
98+
jint, jlong);
9599
/*
96100
* Class: com_taosdata_jdbc_tmq_TMQConnector
97101
* Method: tmqCommitAsync
@@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
102106
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
103107
jobject);
104108

109+
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong,
110+
jobject);
111+
112+
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong,
113+
jstring, jint, jlong, jobject);
114+
105115
/*
106116
* Class: com_taosdata_jdbc_tmq_TMQConnector
107117
* Method: tmqUnsubscribeImp
@@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv
179189
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
180190
jstring, jobject);
181191

192+
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring,
193+
jint);
194+
195+
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint);
196+
182197
#ifdef __cplusplus
183198
}
184199
#endif

source/client/src/clientTmqConnector.c

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
291291
TAOS_RES *res = (TAOS_RES *)jres;
292292
return tmq_commit_sync(tmq, res);
293293
}
294+
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) {
295+
tmq_t *tmq = (tmq_t *)jtmq;
296+
if (tmq == NULL) {
297+
jniError("jobj:%p, tmq is closed", jobj);
298+
return TMQ_CONSUMER_NULL;
299+
}
300+
301+
return tmq_commit_sync(tmq, NULL);
302+
}
303+
304+
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj,
305+
jlong jtmq, jstring jtopic,
306+
jint vgId, jlong offset) {
307+
tmq_t *tmq = (tmq_t *)jtmq;
308+
if (tmq == NULL) {
309+
jniDebug("jobj:%p, tmq is closed", jobj);
310+
return TMQ_CONSUMER_NULL;
311+
}
312+
313+
if (jtopic == NULL) {
314+
jniDebug("jobj:%p, topic is null", jobj);
315+
return TMQ_TOPIC_NULL;
316+
}
317+
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
318+
319+
int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset);
320+
if (code != TSDB_CODE_SUCCESS) {
321+
jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code));
322+
}
323+
324+
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
325+
return code;
326+
}
294327

295328
// deprecated
296329
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
@@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy
319352
tmq_commit_async(tmq, res, consumer_callback, offset);
320353
}
321354

355+
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj,
356+
jlong jtmq, jobject offset) {
357+
tmqGlobalMethod(env);
358+
tmq_t *tmq = (tmq_t *)jtmq;
359+
360+
offset = (*env)->NewGlobalRef(env, offset);
361+
tmq_commit_async(tmq, NULL, consumer_callback, offset);
362+
}
363+
364+
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj,
365+
jlong jtmq, jstring jtopic,
366+
jint vgId, jlong offset,
367+
jobject callback) {
368+
tmqGlobalMethod(env);
369+
tmq_t *tmq = (tmq_t *)jtmq;
370+
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
371+
372+
callback = (*env)->NewGlobalRef(env, callback);
373+
tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback);
374+
}
375+
322376
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
323377
jlong jtmq) {
324378
tmq_t *tmq = (tmq_t *)jtmq;
@@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
497551
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
498552

499553
if (res != TSDB_CODE_SUCCESS) {
500-
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
501554
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
502555
tmq_err2str(res));
556+
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
503557
tmq_free_assignment(pAssign);
504558
return (jint)res;
505559
}
@@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
518572
tmq_free_assignment(pAssign);
519573
return JNI_SUCCESS;
520574
}
575+
576+
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq,
577+
jstring jtopic, jint vgId) {
578+
tmq_t *tmq = (tmq_t *)jtmq;
579+
if (tmq == NULL) {
580+
jniDebug("jobj:%p, tmq is closed", jobj);
581+
return TMQ_CONSUMER_NULL;
582+
}
583+
584+
if (jtopic == NULL) {
585+
jniDebug("jobj:%p, topic is null", jobj);
586+
return TMQ_TOPIC_NULL;
587+
}
588+
589+
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
590+
591+
int64_t offset = tmq_committed(tmq, topicName, vgId);
592+
593+
if (offset < JNI_SUCCESS && offset != -2147467247) {
594+
jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName,
595+
vgId, offset, tmq_err2str(offset));
596+
}
597+
598+
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
599+
return (jlong)offset;
600+
}
601+
602+
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq,
603+
jstring jtopic, jint vgId) {
604+
tmq_t *tmq = (tmq_t *)jtmq;
605+
if (tmq == NULL) {
606+
jniDebug("jobj:%p, tmq is closed", jobj);
607+
return TMQ_CONSUMER_NULL;
608+
}
609+
610+
if (jtopic == NULL) {
611+
jniDebug("jobj:%p, topic is null", jobj);
612+
return TMQ_TOPIC_NULL;
613+
}
614+
615+
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
616+
617+
int64_t offset = tmq_position(tmq, topicName, vgId);
618+
619+
if (offset < JNI_SUCCESS) {
620+
jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId,
621+
offset, tmq_err2str(offset));
622+
}
623+
624+
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
625+
return (jlong)offset;
626+
}

0 commit comments

Comments
 (0)