Skip to content

Commit 2f3bcf5

Browse files
authored
feat: Add async DynamoDB timeout and retry configuration (feast-dev#5178)
* Add async DynamoDB timeout and retry configuration Signed-off-by: Sebastian Jäger <[email protected]> * Use union sytnax Signed-off-by: Sebastian Jäger <[email protected]> * Move retry configutation into top level, remove max_attempts Signed-off-by: Sebastian Jäger <[email protected]> * Add explicity type hint to retries dict Signed-off-by: Sebastian Jäger <[email protected]> * Update docstrings Signed-off-by: Sebastian Jäger <[email protected]> * Fix docstrings Signed-off-by: Sebastian Jäger <[email protected]> * Update doc string, use None as retries default Signed-off-by: Sebastian Jäger <[email protected]> --------- Signed-off-by: Sebastian Jäger <[email protected]>
1 parent 1229c32 commit 2f3bcf5

File tree

1 file changed

+54
-4
lines changed

1 file changed

+54
-4
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,26 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
8585
keepalive_timeout: float = 12.0
8686
"""Keep-alive timeout in seconds for async Dynamodb connections."""
8787

88+
connect_timeout: Union[int, float] = 60
89+
"""The time in seconds until a timeout exception is thrown when attempting to make
90+
an async connection."""
91+
92+
read_timeout: Union[int, float] = 60
93+
"""The time in seconds until a timeout exception is thrown when attempting to read
94+
from an async connection."""
95+
96+
total_max_retry_attempts: Union[int, None] = None
97+
"""Maximum number of total attempts that will be made on a single request.
98+
99+
Maps to `retries.total_max_attempts` in botocore.config.Config.
100+
"""
101+
102+
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None
103+
"""The type of retry mode (aio)botocore should use.
104+
105+
Maps to `retries.mode` in botocore.config.Config.
106+
"""
107+
88108

89109
class DynamoDBOnlineStore(OnlineStore):
90110
"""
@@ -99,10 +119,16 @@ class DynamoDBOnlineStore(OnlineStore):
99119
_dynamodb_resource = None
100120

101121
async def initialize(self, config: RepoConfig):
122+
online_config = config.online_store
123+
102124
await _get_aiodynamodb_client(
103-
config.online_store.region,
104-
config.online_store.max_pool_connections,
105-
config.online_store.keepalive_timeout,
125+
online_config.region,
126+
online_config.max_pool_connections,
127+
online_config.keepalive_timeout,
128+
online_config.connect_timeout,
129+
online_config.read_timeout,
130+
online_config.total_max_retry_attempts,
131+
online_config.retry_mode,
106132
)
107133

108134
async def close(self):
@@ -280,6 +306,10 @@ async def online_write_batch_async(
280306
online_config.region,
281307
online_config.max_pool_connections,
282308
online_config.keepalive_timeout,
309+
online_config.connect_timeout,
310+
online_config.read_timeout,
311+
online_config.total_max_retry_attempts,
312+
online_config.retry_mode,
283313
)
284314
await dynamo_write_items_async(client, table_name, items)
285315

@@ -387,6 +417,10 @@ def to_tbl_resp(raw_client_response):
387417
online_config.region,
388418
online_config.max_pool_connections,
389419
online_config.keepalive_timeout,
420+
online_config.connect_timeout,
421+
online_config.read_timeout,
422+
online_config.total_max_retry_attempts,
423+
online_config.retry_mode,
390424
)
391425
response_batches = await asyncio.gather(
392426
*[
@@ -546,16 +580,32 @@ def _get_aioboto_session():
546580

547581

548582
async def _get_aiodynamodb_client(
549-
region: str, max_pool_connections: int, keepalive_timeout: float
583+
region: str,
584+
max_pool_connections: int,
585+
keepalive_timeout: float,
586+
connect_timeout: Union[int, float],
587+
read_timeout: Union[int, float],
588+
total_max_retry_attempts: Union[int, None],
589+
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None],
550590
):
551591
global _aioboto_client
552592
if _aioboto_client is None:
553593
logger.debug("initializing the aiobotocore dynamodb client")
594+
595+
retries: Dict[str, Any] = {}
596+
if total_max_retry_attempts is not None:
597+
retries["total_max_attempts"] = total_max_retry_attempts
598+
if retry_mode is not None:
599+
retries["mode"] = retry_mode
600+
554601
client_context = _get_aioboto_session().create_client(
555602
"dynamodb",
556603
region_name=region,
557604
config=AioConfig(
558605
max_pool_connections=max_pool_connections,
606+
connect_timeout=connect_timeout,
607+
read_timeout=read_timeout,
608+
retries=retries if retries else None,
559609
connector_args={"keepalive_timeout": keepalive_timeout},
560610
),
561611
)

0 commit comments

Comments
 (0)