Skip to content

Commit eb3a9ad

Browse files
authored
fix: create one protosocket client with poolsize arg for connection_count (#405)
Creating just one protosocket client (and thus only one underlying connection manager) results in better load-balanced connections and also prevents multiple clients from hitting up the endpoint discovery api every 30 seconds. However, we must make sure to start the same number of protosocket_task workers, else we undercut rpc-perf's ability to reach the target TPS.
1 parent f05d791 commit eb3a9ad

File tree

1 file changed

+28
-18
lines changed

1 file changed

+28
-18
lines changed

src/clients/cache/momento/protosocket.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,29 @@ pub fn launch_tasks(
4949
}
5050
};
5151

52-
for _connection_number in 0..config.client().unwrap().poolsize() {
53-
runtime.spawn(launch_protosocket_task(
54-
config.clone(),
55-
credential_provider.clone(),
56-
work_receiver.clone(),
57-
));
58-
}
52+
runtime.spawn(launch_protosocket_task(
53+
config.clone(),
54+
credential_provider.clone(),
55+
work_receiver.clone(),
56+
));
5957
}
6058

6159
async fn launch_protosocket_task(
6260
config: Config,
6361
credential_provider: CredentialProvider,
6462
work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>,
6563
) {
64+
let poolsize = config.client().unwrap().poolsize();
65+
let concurrency = config.client().unwrap().concurrency();
66+
67+
// Make just one protosocket client (and underlying connection manager) using
68+
// poolsize in the connection_count configuration.
6669
let client = match ProtosocketCacheClient::builder()
6770
.default_ttl(Duration::from_secs(900))
6871
.configuration(
6972
Configuration::builder()
7073
.timeout(Duration::from_secs(10))
71-
.connection_count(1)
74+
.connection_count(poolsize as u32)
7275
.az_id(None)
7376
.build(),
7477
)
@@ -84,17 +87,24 @@ async fn launch_protosocket_task(
8487
}
8588
};
8689

87-
CONNECT.increment();
88-
CONNECT_CURR.increment();
90+
// Make sure to increment the metrics for each connection in the pool.
91+
// Also make sure to preserve the same number of protosocket tasks as
92+
// would have been created when we made more than one client.
93+
let mut join_handles = vec![];
94+
for _ in 0..poolsize {
95+
CONNECT.increment();
96+
CONNECT_CURR.increment();
8997

90-
let result = protosocket_task(
91-
config.clone(),
92-
client.clone(),
93-
work_receiver.clone(),
94-
config.client().unwrap().concurrency(),
95-
)
96-
.await;
97-
eprintln!("protosocket driver task exited: {result:?}");
98+
let client = client.clone();
99+
let config = config.clone();
100+
let work_receiver = work_receiver.clone();
101+
102+
join_handles.push(tokio::spawn(async move {
103+
let result = protosocket_task(config, client, work_receiver, concurrency).await;
104+
eprintln!("protosocket driver task exited: {result:?}");
105+
}));
106+
}
107+
futures::future::join_all(join_handles).await;
98108
}
99109

100110
async fn protosocket_task(

0 commit comments

Comments
 (0)