Skip to content

Commit 391dd60

Browse files
authored
feat: support momento leaderboards (#355)
Adds a leaderboard workload type, configuration, metrics, and momento client implementation.
1 parent 4458d0b commit 391dd60

File tree

14 files changed

+864
-7
lines changed

14 files changed

+864
-7
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ hyper-util = { version = "0.1.8", features = ["full"] }
3535
metriken = "0.7.0"
3636
metriken-exposition = { version = "0.11.1", features = ["json", "parquet-conversion"] }
3737
mio = "1.0.3"
38-
momento = "0.46.1"
38+
momento = "0.49.0"
3939
once_cell = "1.18.0"
4040
openssl = { version = "0.10.71", optional = true }
4141
openssl-src = "300.3.1"

configs/momento_leaderboard.toml

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# An example configuration for benchmarking Momento (https://www.gomomento.com)
2+
# leaderboards.
3+
4+
[general]
5+
# specify the protocol to be used
6+
protocol = "momento"
7+
# the interval for stats integration and reporting
8+
interval = 60
9+
# the number of intervals to run the test for
10+
duration = 300
11+
# run the admin thread with a HTTP listener at the address provided, this allows
12+
# stats exposition via HTTP
13+
admin = "127.0.0.1:9090"
14+
# optionally, set an initial seed for the PRNGs used to generate the workload.
15+
# The default is to intialize from the OS entropy pool.
16+
#initial_seed = "0"
17+
18+
#[metrics]
19+
# output file for detailed stats during the run
20+
#output = "stats.json"
21+
# format of the output file (possible values are json, msgpack, parquet)
22+
#format = "json"
23+
# optionally specify batch size for parquet row groups
24+
# only valid for parquet output
25+
#batch_size = 100_000
26+
# optionally specify histogram type (can be standard (default) or sparse)
27+
# only valid for parquet output
28+
#histogram = "sparse"
29+
# optionally, specify the sampling interval for metrics. Input is a string
30+
# with the unit attached; for example "100ms" or "1s". Defaults to 1s.
31+
#interval = "1s"
32+
33+
[debug]
34+
# choose from: error, warn, info, debug, trace
35+
log_level = "info"
36+
# optionally, log to the file below instead of standard out
37+
# log_file = "rpc-perf.log"
38+
# backup file name for use with log rotation
39+
log_backup = "rpc-perf.log.old"
40+
# trigger log rotation when the file grows beyond this size (in bytes). Set this
41+
# option to '0' to disable log rotation.
42+
log_max_size = 1073741824
43+
44+
[target]
45+
# we don't need to specify any endpoints for momento
46+
endpoints = []
47+
# specify the name of the target cache for the leaderboard
48+
cache_name = "test-cache"
49+
50+
[leaderboard]
51+
# number of threads used to drive client requests
52+
threads = 1
53+
# number of gRPC clients to initialize, each maintains at least one TCP stream
54+
poolsize = 1
55+
# an upper limit on the number of concurrent requests per gRPC client
56+
concurrency = 20
57+
# set the timeout in milliseconds
58+
request_timeout = 1000
59+
60+
[workload]
61+
# the number of threads that will be used to generate the workload
62+
threads = 1
63+
64+
[workload.ratelimit]
65+
# set a global ratelimit for the workload
66+
start = 50
67+
68+
[[workload.leaderboard]]
69+
weight = 1
70+
# Number of distinct leaderboards to create
71+
nleaderboards = 1
72+
# Number of distinct ids to use across all leaderboards
73+
nids = 100
74+
75+
# run upsert vs get_competition_rank in a 10:1 ratio
76+
# each time, only upsert/query a single id
77+
commands = [
78+
{ verb = "upsert", weight = 10, cardinality = 1 },
79+
{ verb = "get_competition_rank", weight = 1, cardinality = 1 },
80+
]

src/clients/leaderboard/mod.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/// # Leaderboard Clients
2+
///
3+
/// Leaderboards provide at-scale competitor ranking and lookup.
4+
///
5+
/// RPC-Perf store clients are used to evaluate the performance of object
6+
/// leaderboard services in terms of throughput and latency.
7+
use crate::*;
8+
9+
use async_channel::Receiver;
10+
use tokio::runtime::Runtime;
11+
use workload::{ClientWorkItemKind, LeaderboardClientRequest};
12+
13+
mod momento;
14+
15+
pub fn launch(
16+
config: &Config,
17+
work_receiver: Receiver<ClientWorkItemKind<LeaderboardClientRequest>>,
18+
) -> Option<Runtime> {
19+
if config.leaderboard().is_none() {
20+
debug!("No leaderboard configuration specified");
21+
return None;
22+
}
23+
debug!("Launching clients...");
24+
25+
config.leaderboard()?;
26+
27+
// spawn the request drivers on their own runtime
28+
let mut client_rt = Builder::new_multi_thread()
29+
.enable_all()
30+
.worker_threads(config.leaderboard().unwrap().threads())
31+
.build()
32+
.expect("failed to initialize tokio runtime");
33+
34+
match config.general().protocol() {
35+
Protocol::Momento => momento::launch_tasks(&mut client_rt, config.clone(), work_receiver),
36+
protocol => {
37+
eprintln!(
38+
"leaderboard commands are not supported for the {:?} protocol",
39+
protocol
40+
);
41+
}
42+
}
43+
44+
Some(client_rt)
45+
}

src/clients/leaderboard/momento.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use super::record_result;
2+
use crate::clients::ResponseError;
3+
use crate::config::Config;
4+
use crate::metrics::*;
5+
use crate::workload::{ClientWorkItemKind, LeaderboardClientRequest};
6+
use crate::{workload, RUNNING};
7+
use paste::paste;
8+
9+
use async_channel::Receiver;
10+
use momento::leaderboard::{configurations, LeaderboardClient};
11+
use momento::CredentialProvider;
12+
use ringlog::debug;
13+
use tokio::runtime::Runtime;
14+
use tokio::time::timeout;
15+
16+
use std::io::{Error, Result};
17+
use std::sync::atomic::Ordering;
18+
use std::time::Instant;
19+
20+
/// Launch tasks with one channel per task as gRPC is mux-enabled.
21+
pub fn launch_tasks(
22+
runtime: &mut Runtime,
23+
config: Config,
24+
work_receiver: Receiver<ClientWorkItemKind<LeaderboardClientRequest>>,
25+
) {
26+
debug!("launching momento protocol tasks");
27+
28+
let cache_name = config
29+
.target()
30+
.cache_name()
31+
.unwrap_or_else(|| {
32+
eprintln!("cache name is not specified in the `target` section");
33+
std::process::exit(1);
34+
})
35+
.to_string();
36+
37+
for _ in 0..config.leaderboard().unwrap().poolsize() {
38+
let client = {
39+
let _guard = runtime.enter();
40+
41+
// initialize the Momento cache client
42+
if std::env::var("MOMENTO_API_KEY").is_err() {
43+
eprintln!("environment variable `MOMENTO_API_KEY` is not set");
44+
std::process::exit(1);
45+
}
46+
47+
let credential_provider =
48+
match CredentialProvider::from_env_var("MOMENTO_API_KEY".to_string()) {
49+
Ok(v) => v,
50+
Err(e) => {
51+
eprintln!("MOMENTO_API_KEY key should be valid: {e}");
52+
std::process::exit(1);
53+
}
54+
};
55+
56+
match LeaderboardClient::builder()
57+
.configuration(configurations::LowLatency::v1())
58+
.credential_provider(credential_provider)
59+
.build()
60+
{
61+
Ok(c) => c,
62+
Err(e) => {
63+
eprintln!("could not create leaderboard client: {}", e);
64+
std::process::exit(1);
65+
}
66+
}
67+
};
68+
69+
LEADERBOARD_CONNECT.increment();
70+
LEADERBOARD_CONNECT_CURR.increment();
71+
72+
// create one task per channel
73+
for _ in 0..config.leaderboard().unwrap().concurrency() {
74+
runtime.spawn(task(
75+
config.clone(),
76+
client.clone(),
77+
cache_name.clone(),
78+
work_receiver.clone(),
79+
));
80+
}
81+
}
82+
}
83+
84+
async fn task(
85+
config: Config,
86+
mut client: LeaderboardClient,
87+
cache_name: String,
88+
work_receiver: Receiver<ClientWorkItemKind<LeaderboardClientRequest>>,
89+
) -> Result<()> {
90+
while RUNNING.load(Ordering::Relaxed) {
91+
let work_item = work_receiver
92+
.recv()
93+
.await
94+
.map_err(|_| Error::other("channel closed"))?;
95+
96+
LEADERBOARD_REQUEST.increment();
97+
let start = Instant::now();
98+
let result = match work_item {
99+
ClientWorkItemKind::Request { request, .. } => match request {
100+
LeaderboardClientRequest::Upsert(r) => {
101+
upsert(&mut client, &config, cache_name.clone(), r).await
102+
}
103+
LeaderboardClientRequest::GetCompetitionRank(r) => {
104+
get_competition_rank(&mut client, &config, cache_name.clone(), r).await
105+
}
106+
_ => {
107+
LEADERBOARD_REQUEST_UNSUPPORTED.increment();
108+
continue;
109+
}
110+
},
111+
ClientWorkItemKind::Reconnect => {
112+
continue;
113+
}
114+
};
115+
116+
LEADERBOARD_REQUEST_OK.increment();
117+
118+
let stop = Instant::now();
119+
120+
match result {
121+
Ok(_) => {
122+
LEADERBOARD_RESPONSE_OK.increment();
123+
124+
let latency = stop.duration_since(start).as_nanos() as u64;
125+
126+
let _ = LEADERBOARD_RESPONSE_LATENCY.increment(latency);
127+
}
128+
Err(ResponseError::Exception) => {
129+
LEADERBOARD_RESPONSE_EX.increment();
130+
}
131+
Err(ResponseError::Timeout) => {
132+
LEADERBOARD_RESPONSE_TIMEOUT.increment();
133+
}
134+
Err(ResponseError::Ratelimited) => {
135+
LEADERBOARD_RESPONSE_RATELIMITED.increment();
136+
}
137+
Err(ResponseError::BackendTimeout) => {
138+
LEADERBOARD_RESPONSE_BACKEND_TIMEOUT.increment();
139+
}
140+
}
141+
}
142+
143+
Ok(())
144+
}
145+
146+
/// Insert or update id-score pairs in the leaderboard.
147+
pub async fn upsert(
148+
client: &mut LeaderboardClient,
149+
config: &Config,
150+
cache_name: String,
151+
request: workload::leaderboard::Upsert,
152+
) -> std::result::Result<(), ResponseError> {
153+
LEADERBOARD_UPSERT.increment();
154+
155+
let leaderboard = client.leaderboard(cache_name, request.leaderboard.as_ref().clone());
156+
let result = timeout(
157+
config.leaderboard().unwrap().request_timeout(),
158+
leaderboard.upsert(request.elements),
159+
)
160+
.await;
161+
162+
record_result!(result, LEADERBOARD_UPSERT)
163+
}
164+
165+
/// Get the competition rank of a list of ids in a leaderboard.
166+
pub async fn get_competition_rank(
167+
client: &mut LeaderboardClient,
168+
config: &Config,
169+
cache_name: String,
170+
request: workload::leaderboard::GetCompetitionRank,
171+
) -> std::result::Result<(), ResponseError> {
172+
LEADERBOARD_GET_COMPETITION_RANK.increment();
173+
174+
let leaderboard = client.leaderboard(cache_name, request.leaderboard.as_ref().clone());
175+
let ids = request.ids.as_ref().to_vec();
176+
let result = timeout(
177+
config.leaderboard().unwrap().request_timeout(),
178+
leaderboard.get_competition_rank(ids),
179+
)
180+
.await;
181+
182+
record_result!(result, LEADERBOARD_GET_COMPETITION_RANK)
183+
}

src/clients/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use momento::MomentoErrorCode;
44
pub mod common;
55

66
pub mod cache;
7+
pub mod leaderboard;
78
pub mod oltp;
89
pub mod ping;
910
pub mod pubsub;

src/config/leaderboard.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use super::*;
2+
3+
#[derive(Clone, Deserialize)]
4+
pub struct Leaderboard {
5+
/// The number of connections this process will have to each endpoint.
6+
poolsize: usize,
7+
/// The number of concurrent sessions per connection.
8+
#[serde(default)]
9+
concurrency: usize,
10+
/// Request timeout
11+
request_timeout: u64,
12+
// number of threads for client tasks
13+
threads: usize,
14+
}
15+
16+
impl Leaderboard {
17+
pub fn threads(&self) -> usize {
18+
self.threads
19+
}
20+
21+
pub fn poolsize(&self) -> usize {
22+
std::cmp::max(1, self.poolsize)
23+
}
24+
25+
pub fn concurrency(&self) -> usize {
26+
std::cmp::max(1, self.concurrency)
27+
}
28+
29+
pub fn request_timeout(&self) -> Duration {
30+
Duration::from_millis(self.request_timeout)
31+
}
32+
}

0 commit comments

Comments
 (0)