Skip to content

Commit 2970d25

Browse files
authored
feat: configure subscribers_per_topic for momento pubsub workflow (#363)
Problem A momento client can only support a maximum of 100 subscriptions due to the server side GRPC restriction on concurrent streams per connection. The current implementation assumes that all subscribers subscribe to all topics. We are hence unable to test workflows like 10 topics with 100 subscribers each and we also run into issues if the number of subscriptions created is more than 100 * number of clients. Solution Add a configuration option for subscribers per topic and add runtime checks to fail if the configuration tries to create more subscriptions than what the clients can support.
1 parent 796c50f commit 2970d25

File tree

3 files changed

+40
-13
lines changed

3 files changed

+40
-13
lines changed

src/clients/pubsub/momento.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,21 @@ pub fn launch_subscribers(
3434
if let Component::Topics(topics) = component {
3535
let poolsize = topics.subscriber_poolsize();
3636
let concurrency = topics.subscriber_concurrency();
37-
37+
if concurrency > 100 {
38+
eprintln!("Momento sdk does not support concurrency values greater than 100.");
39+
std::process::exit(1);
40+
}
41+
let num_topics = topics.topics().len();
42+
let subscribers_per_topic = topics
43+
.momento_subscribers_per_topic()
44+
.unwrap_or(poolsize * concurrency);
45+
if num_topics * subscribers_per_topic > poolsize * concurrency {
46+
eprintln!("Not enough Momento clients to support the workload - adjust momento_subscribers_per_topic or increase subscriber_poolsize/subscriber_concurrency.");
47+
std::process::exit(1);
48+
}
49+
let mut clients = Vec::<Arc<TopicClient>>::with_capacity(poolsize);
3850
for _ in 0..poolsize {
3951
let client = {
40-
let _guard = runtime.enter();
41-
4252
// initialize the Momento topic client
4353
if std::env::var("MOMENTO_API_KEY").is_err() {
4454
eprintln!("environment variable `MOMENTO_API_KEY` is not set");
@@ -53,7 +63,7 @@ pub fn launch_subscribers(
5363
std::process::exit(1);
5464
}
5565
};
56-
66+
let _guard = runtime.enter();
5767
match TopicClient::builder()
5868
.configuration(LowLatency::v1())
5969
.credential_provider(credential_provider)
@@ -66,15 +76,20 @@ pub fn launch_subscribers(
6676
}
6777
}
6878
};
69-
70-
for _ in 0..concurrency {
71-
for topic in topics.topics() {
72-
runtime.spawn(subscriber_task(
73-
client.clone(),
74-
cache_name.clone(),
75-
topic.to_string(),
76-
));
77-
}
79+
clients.push(client);
80+
}
81+
let mut client_index = 0;
82+
for topic in topics.topics() {
83+
for _ in 0..subscribers_per_topic {
84+
// Round-robin over the clients to pick one
85+
let client = &clients[client_index];
86+
client_index = (client_index + 1) % clients.len();
87+
let _guard = runtime.enter();
88+
runtime.spawn(subscriber_task(
89+
client.clone(),
90+
cache_name.clone(),
91+
topic.to_string(),
92+
));
7893
}
7994
}
8095
}

src/config/workload.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ pub struct Topics {
168168
topic_distribution: Distribution,
169169
#[serde(default)]
170170
kafka_single_subscriber_group: bool,
171+
#[serde(default)]
172+
momento_subscribers_per_topic: Option<usize>,
171173
}
172174

173175
impl Topics {
@@ -222,6 +224,10 @@ impl Topics {
222224
pub fn kafka_single_subscriber_group(&self) -> bool {
223225
self.kafka_single_subscriber_group
224226
}
227+
228+
pub fn momento_subscribers_per_topic(&self) -> Option<usize> {
229+
self.momento_subscribers_per_topic
230+
}
225231
}
226232

227233
#[derive(Clone, Deserialize)]

src/workload/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ pub struct Topics {
579579
subscriber_poolsize: usize,
580580
subscriber_concurrency: usize,
581581
kafka_single_subscriber_group: bool,
582+
momento_subscribers_per_topic: Option<usize>,
582583
}
583584

584585
impl Topics {
@@ -643,6 +644,7 @@ impl Topics {
643644
subscriber_poolsize,
644645
subscriber_concurrency,
645646
kafka_single_subscriber_group: topics.kafka_single_subscriber_group(),
647+
momento_subscribers_per_topic: topics.momento_subscribers_per_topic(),
646648
}
647649
}
648650

@@ -669,6 +671,10 @@ impl Topics {
669671
pub fn kafka_single_subscriber_group(&self) -> bool {
670672
self.kafka_single_subscriber_group
671673
}
674+
675+
pub fn momento_subscribers_per_topic(&self) -> Option<usize> {
676+
self.momento_subscribers_per_topic
677+
}
672678
}
673679

674680
#[derive(Clone)]

0 commit comments

Comments
 (0)