Description
Inspired by zarrs, where you can do a sort of jump-around-slicing, I broadly wish to be able to do the following disjoint slice indexing (although I am suggesting, motivated by this request, a more general API).
The current "public" solution would be to either asyncio.gather
a bunch of disparate slice requests or to use numpy indexing if you are not using the async API.
I get better performance by batching together all of the slice requests into one indexer use the pipeline-level parallelism as opposed to top-level parallelism.
import zarr
import numpy as np
import asyncio
import platform
import subprocess
def clear_cache():
if platform.system() == "Darwin":
subprocess.call(['sync', '&&', 'sudo', 'purge'])
elif platform.system() == "Linux":
subprocess.call(['sudo', 'sh', '-c', "sync; echo 3 > /proc/sys/vm/drop_caches"])
else:
raise Exception("Unsupported platform")
class MultiBasicIndexer(zarr.core.indexing.Indexer):
"""An indexer that batches together multiple disparate slicing 1D slicing indexers together into a single indexer"""
def __init__(self, indexers: list[zarr.core.indexing.Indexer]):
self.shape = tuple(
sum(i.shape[k] for i in indexers) for k in range(len(indexers[0].shape))
)
self.drop_axes = indexers[0].drop_axes # maybe?
self.indexers = indexers
def __iter__(self):
total = 0
for i in self.indexers:
for c in i:
gap = c[2][0].stop - c[2][0].start
yield type(c)(c[0], c[1], (slice(total, total + gap)), c[3])
total += gap
z = zarr.create_array("foo.zarr", data=np.random.randn(10_000 * 10_000))
chunk_size = z.chunks[0]
chunks = list(range(0, 15, 2)) # disparate chunks i.e., skipping around by a factor of 2
slices = [zarr.core.indexing.BasicIndexer(slice(i * chunk_size, (i+1)*chunk_size), shape=z.metadata.shape, chunk_grid=z.metadata.chunk_grid) for i in range(0, 15, 2)]
np_indexer = np.concatenate([np.arange(i * chunk_size, (i+1)*chunk_size) for i in range(0, 15, 2)])
async def run():
return np.concatenate(await asyncio.gather(*(z._async_array._get_selection(s, prototype=zarr.core.buffer.default_buffer_prototype()) for s in slices))).reshape(chunk_size * len(chunks))
assert (z[np_indexer] == zarr.core.sync.sync(run())).all()
clear_cache()
%timeit z[np_indexer]
clear_cache()
%timeit zarr.core.sync.sync(run())
clear_cache()
%timeit zarr.core.sync.sync(z._async_array._get_selection(MultiBasicIndexer(slices), prototype=zarr.core.buffer.default_buffer_prototype()))
I'm uploading a juv
runnable notebook as well with cache clearing built in:
indexing_benchmark.ipynb.zip

So I see about a 40% speedup for the batched indexing vs. asyncio.gather
separated indexing (both of which are light years ahead of integer indexing). This is an upper bound - usually, the difference was ~20-30%, but zarrs
gives another 20-30% which means we're now close to a 2X speedup.
Thus I see a potential route forward here as making the following public
AsyncArray._get_selection
(or something similar, just saying this because it exists)Indexer
abc class +ChunkProjection
for iteration- An
into_indexer
function that returnsIndexer
-based classes for a simple selection (like tuple of slices or so). I think theIndexer
return type would probably be enough to do useful things since the iterator for allIndexer
subclasses returnsChunkProjection
, which can be modified as needed (see my motivating example).
Open to other thoughts!