Skip to content

Commit f19a31d

Browse files
committed
coord_req()s would only trigger one connection attempt and then go idle
This caused things like send_offsets_to_transaction() to stall and time out if the initial group coordinator connection was unsuccessful.
1 parent 231d5a1 commit f19a31d

File tree

5 files changed

+171
-9
lines changed

5 files changed

+171
-9
lines changed

CHANGELOG.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ librdkafka v1.9.0 is a feature release:
8787
* If a metadata request triggered by `rd_kafka_metadata()` or consumer group rebalancing
8888
encountered a non-retriable error it would not be propagated to the caller and thus
8989
cause a stall or timeout, this has now been fixed. (@aiquestion, #3625)
90+
* AdminAPI `DeleteGroups()` and `DeleteConsumerGroupOffsets()`:
91+
if the given coordinator connection was not up by the time these calls were
92+
initiated and the first connection attempt failed then no further connection
93+
attempts were performed, ulimately leading to the calls timing out.
94+
This is now fixed by keep retrying to connect to the group coordinator
95+
until the connection is successful or the call times out.
9096
* Mock cluster `rd_kafka_mock_broker_set_down()` would previously
9197
accept and then disconnect new connections, it now refuses new connections.
9298

@@ -128,7 +134,7 @@ librdkafka v1.9.0 is a feature release:
128134
(@kevinconaway)
129135

130136

131-
### Producer fixes
137+
### Transactional producer fixes
132138

133139
* Fix message loss in idempotent/transactional producer.
134140
A corner case has been identified that may cause idempotent/transactional
@@ -162,6 +168,17 @@ librdkafka v1.9.0 is a feature release:
162168
broker (added in Apache Kafka 2.8), which could cause the producer to
163169
seemingly hang.
164170
This error code is now correctly handled by raising a fatal error.
171+
* If the given group coordinator connection was not up by the time
172+
`send_offsets_to_transactions()` was called, and the first connection
173+
attempt failed then no further connection attempts were performed, ulimately
174+
leading to `send_offsets_to_transactions()` timing out, and possibly
175+
also the transaction timing out on the transaction coordinator.
176+
This is now fixed by keep retrying to connect to the group coordinator
177+
until the connection is successful or the call times out.
178+
179+
180+
### Producer fixes
181+
165182
* Improved producer queue wakeup scheduling. This should significantly
166183
decrease the number of wakeups and thus syscalls for high message rate
167184
producers. (#3538, #2912)

src/rdkafka_broker.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
253253
rd_ts_t rkb_ts_connect;
254254

255255
/**< Persistent connection demand is tracked by
256-
* an counter for each type of demand.
256+
* a counter for each type of demand.
257257
* The broker thread will maintain a persistent connection
258258
* if any of the counters are non-zero, and revert to
259259
* on-demand mode when they all reach zero.
@@ -276,7 +276,11 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
276276
* rdkafka main thread.
277277
*
278278
* Producer: Broker is the transaction coordinator.
279-
* Counter is maintained by rdkafka_idempotence.c. */
279+
* Counter is maintained by rdkafka_idempotence.c.
280+
*
281+
* All: A coord_req_t is waiting for this broker to come up.
282+
*/
283+
280284
rd_atomic32_t coord;
281285
} rkb_persistconn;
282286

src/rdkafka_coord.c

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,15 @@ static rd_bool_t rd_kafka_coord_req_destroy(rd_kafka_t *rk,
268268
return rd_false;
269269

270270
rd_dassert(creq->creq_done);
271+
272+
/* Clear out coordinator we were waiting for. */
273+
if (creq->creq_rkb) {
274+
rd_kafka_broker_persistent_connection_del(
275+
creq->creq_rkb, &creq->creq_rkb->rkb_persistconn.coord);
276+
rd_kafka_broker_destroy(creq->creq_rkb);
277+
creq->creq_rkb = NULL;
278+
}
279+
271280
rd_kafka_replyq_destroy(&creq->creq_replyq);
272281
rd_free(creq->creq_coordkey);
273282
rd_free(creq);
@@ -447,6 +456,15 @@ static void rd_kafka_coord_req_fsm(rd_kafka_t *rk, rd_kafka_coord_req_t *creq) {
447456
/* Cached coordinator is up, send request */
448457
rd_kafka_replyq_t replyq;
449458

459+
/* Clear out previous coordinator we waited for. */
460+
if (creq->creq_rkb) {
461+
rd_kafka_broker_persistent_connection_del(
462+
creq->creq_rkb,
463+
&creq->creq_rkb->rkb_persistconn.coord);
464+
rd_kafka_broker_destroy(creq->creq_rkb);
465+
creq->creq_rkb = NULL;
466+
}
467+
450468
rd_kafka_replyq_copy(&replyq, &creq->creq_replyq);
451469
err = creq->creq_send_req_cb(rkb, creq->creq_rko,
452470
replyq, creq->creq_resp_cb,
@@ -462,16 +480,37 @@ static void rd_kafka_coord_req_fsm(rd_kafka_t *rk, rd_kafka_coord_req_t *creq) {
462480
rd_true /*done*/);
463481
}
464482

465-
} else {
466-
/* No connection yet. We'll be re-triggered on
467-
* broker state broadcast. */
468-
rd_kafka_broker_schedule_connection(rkb);
483+
} else if (creq->creq_rkb != rkb) {
484+
/* No connection yet.
485+
* Let broker thread know we need a connection.
486+
* We'll be re-triggered on broker state broadcast. */
487+
488+
if (creq->creq_rkb) {
489+
/* Clear previous */
490+
rd_kafka_broker_persistent_connection_del(
491+
rkb, &rkb->rkb_persistconn.coord);
492+
rd_kafka_broker_destroy(creq->creq_rkb);
493+
}
494+
495+
rd_kafka_broker_keep(rkb);
496+
creq->creq_rkb = rkb;
497+
rd_kafka_broker_persistent_connection_add(
498+
rkb, &rkb->rkb_persistconn.coord);
469499
}
470500

471501
rd_kafka_broker_destroy(rkb);
472502
return;
503+
504+
} else if (creq->creq_rkb) {
505+
/* No coordinator information, clear out the previous
506+
* coordinator we waited for. */
507+
rd_kafka_broker_persistent_connection_del(
508+
creq->creq_rkb, &creq->creq_rkb->rkb_persistconn.coord);
509+
rd_kafka_broker_destroy(creq->creq_rkb);
510+
creq->creq_rkb = NULL;
473511
}
474512

513+
475514
/* Get any usable broker to look up the coordinator */
476515
rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, RD_DO_LOCK,
477516
RD_KAFKA_FEATURE_BROKER_GROUP_COORD,

src/rdkafka_coord.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ typedef struct rd_kafka_coord_req_s {
107107
* FindCoordinator requests. */
108108
rd_bool_t creq_done; /**< True if request was sent */
109109

110+
rd_kafka_broker_t *creq_rkb; /**< creq is waiting for this broker to
111+
* come up. */
110112
} rd_kafka_coord_req_t;
111113

112114

tests/0105-transactions_mock.c

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2666,8 +2666,8 @@ static void do_test_topic_disappears_for_awhile(void) {
26662666
SUB_TEST_QUICK();
26672667

26682668
rk = create_txn_producer(
2669-
&mcluster, txnid, 1, NULL, "batch.num.messages", "3", "linger.ms",
2670-
"100", "topic.metadata.refresh.interval.ms", "2000", NULL);
2669+
&mcluster, txnid, 1, "batch.num.messages", "3", "linger.ms", "100",
2670+
"topic.metadata.refresh.interval.ms", "2000", NULL);
26712671

26722672
rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
26732673

@@ -2742,6 +2742,104 @@ static void do_test_topic_disappears_for_awhile(void) {
27422742
}
27432743

27442744

2745+
/**
2746+
* @brief Test that group coordinator requests can handle an
2747+
* untimely disconnect.
2748+
*
2749+
* The transaction manager makes use of librdkafka coord_req to commit
2750+
* transaction offsets to the group coordinator.
2751+
* If the connection to the given group coordinator is not up the
2752+
* coord_req code will request a connection once, but if this connection fails
2753+
* there will be no new attempts and the coord_req will idle until either
2754+
* destroyed or the connection is retried for other reasons.
2755+
* This in turn stalls the send_offsets_to_transaction() call until the
2756+
* transaction times out.
2757+
*/
2758+
static int delayed_up_cb(void *arg) {
2759+
rd_kafka_mock_cluster_t *mcluster = arg;
2760+
rd_sleep(3);
2761+
TEST_SAY("Bringing up group coordinator 2..\n");
2762+
rd_kafka_mock_broker_set_up(mcluster, 2);
2763+
return 0;
2764+
}
2765+
2766+
static void do_test_disconnected_group_coord(void) {
2767+
const char *topic = "mytopic";
2768+
const char *txnid = "myTxnId";
2769+
const char *grpid = "myGrpId";
2770+
const int partition_cnt = 1;
2771+
rd_kafka_t *rk;
2772+
rd_kafka_mock_cluster_t *mcluster;
2773+
rd_kafka_topic_partition_list_t *offsets;
2774+
rd_kafka_consumer_group_metadata_t *cgmetadata;
2775+
test_timing_t timing;
2776+
thrd_t thrd;
2777+
int ret;
2778+
2779+
SUB_TEST_QUICK();
2780+
2781+
test_curr->is_fatal_cb = error_is_fatal_cb;
2782+
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2783+
2784+
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
2785+
2786+
rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
2787+
2788+
/* Broker 1: txn coordinator
2789+
* Broker 2: group coordinator
2790+
* Broker 3: partition leader */
2791+
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
2792+
rd_kafka_mock_coordinator_set(mcluster, "group", grpid, 2);
2793+
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3);
2794+
2795+
/* Bring down group coordinator so there are no undesired
2796+
* connections to it. */
2797+
rd_kafka_mock_broker_set_down(mcluster, 2);
2798+
2799+
2800+
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2801+
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2802+
TEST_CALL_ERR__(rd_kafka_producev(
2803+
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
2804+
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
2805+
test_flush(rk, -1);
2806+
2807+
rd_sleep(1);
2808+
2809+
/* Run a background thread that after 3s, which should be enough
2810+
* to perform the first failed connection attempt, makes the
2811+
* group coordinator available again. */
2812+
thrd_create(&thrd, delayed_up_cb, mcluster);
2813+
2814+
TEST_SAY("Calling send_offsets_to_transaction()\n");
2815+
offsets = rd_kafka_topic_partition_list_new(1);
2816+
rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset = 1;
2817+
cgmetadata = rd_kafka_consumer_group_metadata_new(grpid);
2818+
2819+
TIMING_START(&timing, "send_offsets_to_transaction(-1)");
2820+
TEST_CALL_ERROR__(
2821+
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
2822+
TIMING_STOP(&timing);
2823+
TIMING_ASSERT(&timing, 0, 10 * 1000 /*10s*/);
2824+
2825+
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
2826+
rd_kafka_topic_partition_list_destroy(offsets);
2827+
thrd_join(thrd, &ret);
2828+
2829+
/* Commit the transaction */
2830+
TIMING_START(&timing, "commit_transaction(-1)");
2831+
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2832+
TIMING_STOP(&timing);
2833+
2834+
rd_kafka_destroy(rk);
2835+
2836+
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2837+
test_curr->is_fatal_cb = NULL;
2838+
2839+
SUB_TEST_PASS();
2840+
}
2841+
2842+
27452843
int main_0105_transactions_mock(int argc, char **argv) {
27462844
if (test_needs_auth()) {
27472845
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
@@ -2815,5 +2913,7 @@ int main_0105_transactions_mock(int argc, char **argv) {
28152913

28162914
do_test_topic_disappears_for_awhile();
28172915

2916+
do_test_disconnected_group_coord();
2917+
28182918
return 0;
28192919
}

0 commit comments

Comments
 (0)