Skip to content

Commit 5fb85ea

Browse files
authored
feat(experimental): Add base resumption strategy for bidi streams (#1594)
* feat(experimental): Add base resumption strategy for bidi streams * minor changes * resolving comments
1 parent 0d867bd commit 5fb85ea

File tree

1 file changed

+65
-0
lines changed

1 file changed

+65
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import abc
2+
from typing import Any, Iterable
3+
4+
class _BaseResumptionStrategy(abc.ABC):
5+
"""Abstract base class defining the interface for a bidi stream resumption strategy.
6+
7+
This class defines the skeleton for a pluggable strategy that contains
8+
all the service-specific logic for a given bidi operation (e.g., reads
9+
or writes). This allows a generic retry manager to handle the common
10+
retry loop while sending the state management and request generation
11+
to a concrete implementation of this class.
12+
"""
13+
14+
@abc.abstractmethod
15+
def generate_requests(self, state: Any) -> Iterable[Any]:
16+
"""Generates the next batch of requests based on the current state.
17+
18+
This method is called at the beginning of each retry attempt. It should
19+
inspect the provided state object and generate the appropriate list of
20+
request protos to send to the server. For example, a read strategy
21+
would use this to implement "Smarter Resumption" by creating smaller
22+
`ReadRange` requests for partially downloaded ranges. For bidi-writes,
23+
it will set the `write_offset` field to the persisted size received
24+
from the server in the next request.
25+
26+
:type state: Any
27+
:param state: An object containing all the state needed for the
28+
operation (e.g., requested ranges, user buffers,
29+
bytes written).
30+
"""
31+
pass
32+
33+
@abc.abstractmethod
34+
def update_state_from_response(self, state: Any) -> None:
35+
"""Updates the state based on a successful server response.
36+
37+
This method is called for every message received from the server. It is
38+
responsible for processing the response and updating the shared state
39+
object.
40+
41+
:type state: Any
42+
:param state: The shared state object for the operation, which will be
43+
mutated by this method.
44+
"""
45+
pass
46+
47+
@abc.abstractmethod
48+
async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
49+
"""Prepares the state for the next retry attempt after a failure.
50+
51+
This method is called when a retriable gRPC error occurs. It is
52+
responsible for performing any necessary actions to ensure the next
53+
retry attempt can succeed. For bidi reads, its primary role is to
54+
handle the `BidiReadObjectRedirectError` by extracting the
55+
`routing_token` and updating the state. For bidi writes, it will update
56+
the state to reflect any bytes that were successfully persisted before
57+
the failure.
58+
59+
:type error: :class:`Exception`
60+
:param error: The exception that was caught by the retry engine.
61+
62+
:type state: Any
63+
:param state: The shared state object for the operation.
64+
"""
65+
pass

0 commit comments

Comments
 (0)