Skip to content

PYTHON-5356 - Init unified test client SDAM for all unified tests #2325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
PYTHON-5356 - Init unified test clients for CSOT tests
  • Loading branch information
NoahStapp committed Apr 30, 2025
commit 9d04fdf969a4d022517d0148b3afd716b4dfd22a
14 changes: 8 additions & 6 deletions test/asynchronous/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def _handle_placeholders(self, spec: dict, current: dict, path: str) -> Any:
current[key] = self._handle_placeholders(spec, value, subpath)
return current

async def _create_entity(self, entity_spec, uri=None):
async def _create_entity(self, entity_spec, uri=None, init_client=False):
if len(entity_spec) != 1:
self.test.fail(f"Entity spec {entity_spec} did not contain exactly one top-level key")

Expand Down Expand Up @@ -303,6 +303,8 @@ async def _create_entity(self, entity_spec, uri=None):
if uri:
kwargs["h"] = uri
client = await self.test.async_rs_or_single_client(**kwargs)
if init_client:
await client.aconnect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove init_client and always call aconnect() here, since aconnect() doesn't run any commands it's always safe to start SDAM tasks. If we added an explicit client.admin.command() then we would need the init_client flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually want to call client.admin.command() instead? That would ensure we actually connect with a non-SDAM connection before the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that will cause problems with event monitoring. We plan to add "ping" explicitly to the unified tests themselves to fix that.

self[spec["id"]] = client
return
elif entity_type == "database":
Expand Down Expand Up @@ -390,9 +392,9 @@ async def drop(self: AsyncGridFSBucket, *args: Any, **kwargs: Any) -> None:

self.test.fail(f"Unable to create entity of unknown type {entity_type}")

async def create_entities_from_spec(self, entity_spec, uri=None):
async def create_entities_from_spec(self, entity_spec, uri=None, init_client=False):
for spec in entity_spec:
await self._create_entity(spec, uri=uri)
await self._create_entity(spec, uri=uri, init_client=init_client)

def get_listener_for_client(self, client_name: str) -> EventListenerUtil:
client = self[client_name]
Expand Down Expand Up @@ -1406,7 +1408,7 @@ async def run_scenario(self, spec, uri=None):
attempts = 3
for i in range(attempts):
try:
return await self._run_scenario(spec, uri)
return await self._run_scenario(spec, uri, init_client=True)
except (AssertionError, OperationFailure) as exc:
if isinstance(exc, OperationFailure) and (
_IS_SYNC or "failpoint" not in exc._message
Expand All @@ -1426,7 +1428,7 @@ async def run_scenario(self, spec, uri=None):
await self._run_scenario(spec, uri)
return None

async def _run_scenario(self, spec, uri=None):
async def _run_scenario(self, spec, uri=None, init_client=False):
# maybe skip test manually
self.maybe_skip_test(spec)

Expand All @@ -1444,7 +1446,7 @@ async def _run_scenario(self, spec, uri=None):
self._uri = uri
self.entity_map = EntityMapUtil(self)
await self.entity_map.create_entities_from_spec(
self.TEST_SPEC.get("createEntities", []), uri=uri
self.TEST_SPEC.get("createEntities", []), uri=uri, init_client=init_client
)
self._cluster_time = None
# process initialData
Expand Down
16 changes: 10 additions & 6 deletions test/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def _handle_placeholders(self, spec: dict, current: dict, path: str) -> Any:
current[key] = self._handle_placeholders(spec, value, subpath)
return current

def _create_entity(self, entity_spec, uri=None):
def _create_entity(self, entity_spec, uri=None, init_client=False):
if len(entity_spec) != 1:
self.test.fail(f"Entity spec {entity_spec} did not contain exactly one top-level key")

Expand Down Expand Up @@ -302,6 +302,8 @@ def _create_entity(self, entity_spec, uri=None):
if uri:
kwargs["h"] = uri
client = self.test.rs_or_single_client(**kwargs)
if init_client:
client._connect()
self[spec["id"]] = client
return
elif entity_type == "database":
Expand Down Expand Up @@ -389,9 +391,9 @@ def drop(self: GridFSBucket, *args: Any, **kwargs: Any) -> None:

self.test.fail(f"Unable to create entity of unknown type {entity_type}")

def create_entities_from_spec(self, entity_spec, uri=None):
def create_entities_from_spec(self, entity_spec, uri=None, init_client=False):
for spec in entity_spec:
self._create_entity(spec, uri=uri)
self._create_entity(spec, uri=uri, init_client=init_client)

def get_listener_for_client(self, client_name: str) -> EventListenerUtil:
client = self[client_name]
Expand Down Expand Up @@ -1393,7 +1395,7 @@ def run_scenario(self, spec, uri=None):
attempts = 3
for i in range(attempts):
try:
return self._run_scenario(spec, uri)
return self._run_scenario(spec, uri, init_client=True)
except (AssertionError, OperationFailure) as exc:
if isinstance(exc, OperationFailure) and (
_IS_SYNC or "failpoint" not in exc._message
Expand All @@ -1413,7 +1415,7 @@ def run_scenario(self, spec, uri=None):
self._run_scenario(spec, uri)
return None

def _run_scenario(self, spec, uri=None):
def _run_scenario(self, spec, uri=None, init_client=False):
# maybe skip test manually
self.maybe_skip_test(spec)

Expand All @@ -1430,7 +1432,9 @@ def _run_scenario(self, spec, uri=None):
# process createEntities
self._uri = uri
self.entity_map = EntityMapUtil(self)
self.entity_map.create_entities_from_spec(self.TEST_SPEC.get("createEntities", []), uri=uri)
self.entity_map.create_entities_from_spec(
self.TEST_SPEC.get("createEntities", []), uri=uri, init_client=init_client
)
self._cluster_time = None
# process initialData
if "initialData" in self.TEST_SPEC:
Expand Down