Skip to content

Preventing Deadlocks When Reading Metadata Concurrently via asyncio.gather #3207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

dgegen
Copy link

@dgegen dgegen commented Jul 5, 2025

As described in #3196, I encountered issues opening Zarr v3 arrays stored over SFTP using fsspec. Specifically, python would freeze opening zarr arrays.

Root Cause

The issue stems from the use of asyncio.gather in zarr.core.array.get_array_metadata, which attempts to read multiple metadata files (e.g., .zarray, .zattrs, zarr.json) concurrently. This works well for truly asynchronous filesystems, but breaks when using systems like SFTPFileSystem, which does not seem to be concurrency-safe in async contexts (potentially relying on blocking I/O internally or managing connection states using global locks) leading to deadlocks or indefinite hangs when asyncio.gather is used to perform multiple reads simultaneously.

Solution

To address this, I’ve implemented a fallback to sequential reads for filesystems that are not concurrency-safe. The logic is as follows: For non asynchronous file systems, the user sets store.fs.asynchronous=False. The helper function is_concurrency_safe(store_path: StorePath) -> bool, checks this getattr(fs, "asynchronous", True). If True asyncio.gather is used, else we fall back to sequential await. This Preserves the performance benefit of concurrent reads for safe filesystems (e.g., local disk, S3, GCS), while preventing deadlocks and improved robustness when using backends like SFTP.

These changes may not address all scenarios not asynchronous file systems could cause issues, as there are several other instances of asyncio.gather in zarr.core.array and zarr.core.group. However, I opted to focus on this specific problem first, as enabling the opening of arrays and groups is likely the highest priority, and I wanted to discuss this approach before making too many changes.

I look forward to hearing your thoughts and seeing this issue resolved!

TODO:

  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/user-guide/*.rst
  • Changes documented as a new file in changes/
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

- Introduced is_concurrency_safe function to determine if the file system
supports concurrent access.
- Modified get_array_metadata to read metadata files sequentially for
non-concurrency-safe file systems.
- Enhanced compatibility with synchronous file systems by avoiding deadlocks
when accessing metadata concurrently.
- Updated logic to conditionally use asyncio.gather based on the concurrency
safety of the underlying file system.
@github-actions github-actions bot added the needs release notes Automatically applied to PRs which haven't added release notes label Jul 5, 2025
@d-v-b
Copy link
Contributor

d-v-b commented Jul 5, 2025

Good detective work here! I think the ideal solution would keep store implementation details confined to the store classes themselves. So instead of the solution here, what if we override the get_many method on the fsspec store to include the logic you have added here, and then use that method instead of multiple gets

@dgegen
Copy link
Author

dgegen commented Jul 5, 2025

Very good point! Perhaps something along these lines?

class StorePath:
    # ...
    async def _is_concurrency_save(self):
        fs = getattr(self.store, "fs", None)
        return getattr(fs, "asynchronous", True)

    async def get_many(
        self,
        *suffixes : str,
        prototype: BufferPrototype | None = None,
        byte_range: ByteRequest | None = None,
    ):
        tasks = [
            (self / suffix).get(prototype=prototype, byte_range=byte_range) for suffix in suffixes
        ]
        if await self._is_concurrency_save():
            return await gather(*tasks)
        else:
            results = []
            for task in tasks:
                result = await task
                results.append(result)
            return results
            
class FsspecStore:
    # ...
    async def _get_many(
        self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
    ) -> AsyncGenerator[tuple[str, Buffer | None], None]:
        if getattr(self.fs, "asynchronous", True):
            async for result in super()._get_many(requests=requests):
                yield result
        else:
            for key, prototype, byte_range in requests:
                value = await self.get(key, prototype, byte_range)
                yield (key, value)
                

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs release notes Automatically applied to PRs which haven't added release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants