Skip to content

Commit 00fa21f

Browse files
feat: Cassandra online store, concurrent fetching for multiple entities (feast-dev#3356)
concurrent fetching for multiple entities minimal handling of exceptions in concurrent query execution read_concurrency parameter in Cassandra online store config yaml Signed-off-by: Stefano Lottini <[email protected]> Signed-off-by: Stefano Lottini <[email protected]> Co-authored-by: Stefano Lottini <[email protected]>
1 parent 2733368 commit 00fa21f

File tree

5 files changed

+88
-30
lines changed

5 files changed

+88
-30
lines changed

docs/reference/online-stores/cassandra.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ online_store:
3232
load_balancing: # optional
3333
local_dc: 'datacenter1' # optional
3434
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
35+
read_concurrency: 100 # optional
3536
```
3637
{% endcode %}
3738
@@ -52,7 +53,7 @@ online_store:
5253
load_balancing: # optional
5354
local_dc: 'eu-central-1' # optional
5455
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
55-
56+
read_concurrency: 100 # optional
5657
```
5758
{% endcode %}
5859

sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ online_store:
5858
load_balancing: # optional
5959
local_dc: 'datacenter1' # optional
6060
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
61+
read_concurrency: 100 # optional
6162
```
6263
6364
#### Astra DB setup:
@@ -84,6 +85,7 @@ online_store:
8485
load_balancing: # optional
8586
local_dc: 'eu-central-1' # optional
8687
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
88+
read_concurrency: 100 # optional
8789
```
8890
8991
#### Protocol version and load-balancing settings
@@ -111,6 +113,14 @@ The former parameter is a region name for Astra DB instances (as can be verified
111113
See the source code of the online store integration for the allowed values of
112114
the latter parameter.
113115

116+
#### Read concurrency value
117+
118+
You can optionally specify the value of `read_concurrency`, which will be
119+
passed to the Cassandra driver function handling
120+
[concurrent reading of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent).
121+
Consult the reference for guidance on this parameter (which in most cases can be left to its default value of 100).
122+
This is relevant only for retrieval of several entities at once.
123+
114124
### More info
115125

116126
For a more detailed walkthrough, please see the

sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
ResultSet,
3131
Session,
3232
)
33+
from cassandra.concurrent import execute_concurrent_with_args
3334
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
3435
from cassandra.query import PreparedStatement
3536
from pydantic import StrictFloat, StrictInt, StrictStr
@@ -166,6 +167,14 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
166167
wrapped into an execution profile if present.
167168
"""
168169

170+
read_concurrency: Optional[StrictInt] = 100
171+
"""
172+
Value of the `concurrency` parameter internally passed to Cassandra driver's
173+
`execute_concurrent_with_args ` call.
174+
See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent .
175+
Default: 100.
176+
"""
177+
169178

170179
class CassandraOnlineStore(OnlineStore):
171180
"""
@@ -358,32 +367,36 @@ def online_read(
358367

359368
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
360369

361-
for entity_key in entity_keys:
362-
entity_key_bin = serialize_entity_key(
370+
entity_key_bins = [
371+
serialize_entity_key(
363372
entity_key,
364373
entity_key_serialization_version=config.entity_key_serialization_version,
365374
).hex()
375+
for entity_key in entity_keys
376+
]
377+
378+
with tracing_span(name="remote_call"):
379+
feature_rows_sequence = self._read_rows_by_entity_keys(
380+
config,
381+
project,
382+
table,
383+
entity_key_bins,
384+
columns=["feature_name", "value", "event_ts"],
385+
)
366386

367-
with tracing_span(name="remote_call"):
368-
feature_rows = self._read_rows_by_entity_key(
369-
config,
370-
project,
371-
table,
372-
entity_key_bin,
373-
columns=["feature_name", "value", "event_ts"],
374-
)
375-
387+
for entity_key_bin, feature_rows in zip(entity_key_bins, feature_rows_sequence):
376388
res = {}
377389
res_ts = None
378-
for feature_row in feature_rows:
379-
if (
380-
requested_features is None
381-
or feature_row.feature_name in requested_features
382-
):
383-
val = ValueProto()
384-
val.ParseFromString(feature_row.value)
385-
res[feature_row.feature_name] = val
386-
res_ts = feature_row.event_ts
390+
if feature_rows:
391+
for feature_row in feature_rows:
392+
if (
393+
requested_features is None
394+
or feature_row.feature_name in requested_features
395+
):
396+
val = ValueProto()
397+
val.ParseFromString(feature_row.value)
398+
res[feature_row.feature_name] = val
399+
res_ts = feature_row.event_ts
387400
if not res:
388401
result.append((None, None))
389402
else:
@@ -479,12 +492,12 @@ def _write_rows(
479492
params,
480493
)
481494

482-
def _read_rows_by_entity_key(
495+
def _read_rows_by_entity_keys(
483496
self,
484497
config: RepoConfig,
485498
project: str,
486499
table: FeatureView,
487-
entity_key_bin: str,
500+
entity_key_bins: List[str],
488501
columns: Optional[List[str]] = None,
489502
) -> ResultSet:
490503
"""
@@ -500,7 +513,25 @@ def _read_rows_by_entity_key(
500513
fqtable=fqtable,
501514
columns=projection_columns,
502515
)
503-
return session.execute(select_cql, [entity_key_bin])
516+
retrieval_results = execute_concurrent_with_args(
517+
session,
518+
select_cql,
519+
((entity_key_bin,) for entity_key_bin in entity_key_bins),
520+
concurrency=config.online_store.read_concurrency,
521+
)
522+
# execute_concurrent_with_args return a sequence
523+
# of (success, result_or_exception) pairs:
524+
returned_sequence = []
525+
for success, result_or_exception in retrieval_results:
526+
if success:
527+
returned_sequence.append(result_or_exception)
528+
else:
529+
# an exception
530+
logger.error(
531+
f"Cassandra online store exception during concurrent fetching: {str(result_or_exception)}"
532+
)
533+
returned_sequence.append(None)
534+
return returned_sequence
504535

505536
def _drop_table(
506537
self,

sdk/python/feast/templates/cassandra/bootstrap.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,16 @@ def collect_cassandra_store_settings():
7070
sys.exit(1)
7171
needs_port = click.confirm("Need to specify port?", default=False)
7272
if needs_port:
73-
c_port = click.prompt("Port to use", default=9042, type=int)
73+
c_port = click.prompt(" Port to use", default=9042, type=int)
7474
else:
7575
c_port = None
7676
use_auth = click.confirm(
7777
"Do you need username/password?",
7878
default=False,
7979
)
8080
if use_auth:
81-
c_username = click.prompt("Database username")
82-
c_password = click.prompt("Database password", hide_input=True)
81+
c_username = click.prompt(" Database username")
82+
c_password = click.prompt(" Database password", hide_input=True)
8383
else:
8484
c_username = None
8585
c_password = None
@@ -95,7 +95,7 @@ def collect_cassandra_store_settings():
9595
)
9696
if specify_protocol_version:
9797
c_protocol_version = click.prompt(
98-
"Protocol version",
98+
" Protocol version",
9999
default={"A": 4, "C": 5}.get(db_type, 5),
100100
type=int,
101101
)
@@ -105,11 +105,11 @@ def collect_cassandra_store_settings():
105105
specify_lb = click.confirm("Specify load-balancing?", default=False)
106106
if specify_lb:
107107
c_local_dc = click.prompt(
108-
"Local datacenter (for load-balancing)",
108+
" Local datacenter (for load-balancing)",
109109
default="datacenter1" if db_type == "C" else None,
110110
)
111111
c_load_balancing_policy = click.prompt(
112-
"Load-balancing policy",
112+
" Load-balancing policy",
113113
type=click.Choice(
114114
[
115115
"TokenAwarePolicy(DCAwareRoundRobinPolicy)",
@@ -122,6 +122,12 @@ def collect_cassandra_store_settings():
122122
c_local_dc = None
123123
c_load_balancing_policy = None
124124

125+
needs_concurrency = click.confirm("Specify read concurrency level?", default=False)
126+
if needs_concurrency:
127+
c_concurrency = click.prompt(" Concurrency level?", default=100, type=int)
128+
else:
129+
c_concurrency = None
130+
125131
return {
126132
"c_secure_bundle_path": c_secure_bundle_path,
127133
"c_hosts": c_hosts,
@@ -132,6 +138,7 @@ def collect_cassandra_store_settings():
132138
"c_protocol_version": c_protocol_version,
133139
"c_local_dc": c_local_dc,
134140
"c_load_balancing_policy": c_load_balancing_policy,
141+
"c_concurrency": c_concurrency,
135142
}
136143

137144

@@ -149,6 +156,7 @@ def apply_cassandra_store_settings(config_file, settings):
149156
'c_protocol_version'
150157
'c_local_dc'
151158
'c_load_balancing_policy'
159+
'c_concurrency'
152160
"""
153161
write_setting_or_remove(
154162
config_file,
@@ -216,6 +224,13 @@ def apply_cassandra_store_settings(config_file, settings):
216224
remove_lines_from_file(config_file, "load_balancing:")
217225
remove_lines_from_file(config_file, "local_dc:")
218226
remove_lines_from_file(config_file, "load_balancing_policy:")
227+
#
228+
write_setting_or_remove(
229+
config_file,
230+
settings["c_concurrency"],
231+
"read_concurrency",
232+
"100",
233+
)
219234

220235

221236
def bootstrap():

sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ online_store:
1616
load_balancing:
1717
local_dc: c_local_dc
1818
load_balancing_policy: c_load_balancing_policy
19+
read_concurrency: 100
1920
entity_key_serialization_version: 2

0 commit comments

Comments
 (0)