Skip to content

Custom Indexing API for Pipeline-level Parallelism #3175

Open
@ilan-gold

Description

@ilan-gold

Inspired by zarrs, where you can do a sort of jump-around-slicing, I broadly wish to be able to do the following disjoint slice indexing (although I am suggesting, motivated by this request, a more general API).

The current "public" solution would be to either asyncio.gather a bunch of disparate slice requests or to use numpy indexing if you are not using the async API.

I get better performance by batching together all of the slice requests into one indexer use the pipeline-level parallelism as opposed to top-level parallelism.

import zarr
import numpy as np

import asyncio
import platform
import subprocess

def clear_cache():
    if platform.system() == "Darwin":
        subprocess.call(['sync', '&&', 'sudo', 'purge'])
    elif platform.system() == "Linux":
        subprocess.call(['sudo', 'sh', '-c', "sync; echo 3 > /proc/sys/vm/drop_caches"])
    else:
        raise Exception("Unsupported platform")


class MultiBasicIndexer(zarr.core.indexing.Indexer):
    """An indexer that batches together multiple disparate slicing 1D slicing indexers together into a single indexer"""
    def __init__(self, indexers: list[zarr.core.indexing.Indexer]):
        self.shape = tuple(
            sum(i.shape[k] for i in indexers) for k in range(len(indexers[0].shape))
        )
        self.drop_axes = indexers[0].drop_axes  # maybe?
        self.indexers = indexers

    def __iter__(self):
        total = 0
        for i in self.indexers:
            for c in i:
                gap = c[2][0].stop - c[2][0].start
                yield type(c)(c[0], c[1], (slice(total, total + gap)), c[3])
                total += gap

z = zarr.create_array("foo.zarr", data=np.random.randn(10_000 * 10_000))
chunk_size = z.chunks[0]

chunks = list(range(0, 15, 2)) # disparate chunks i.e., skipping around by a factor of 2

slices = [zarr.core.indexing.BasicIndexer(slice(i * chunk_size, (i+1)*chunk_size), shape=z.metadata.shape, chunk_grid=z.metadata.chunk_grid) for i in range(0, 15, 2)]
np_indexer = np.concatenate([np.arange(i * chunk_size, (i+1)*chunk_size) for i in range(0, 15, 2)])

async def run():
    return np.concatenate(await asyncio.gather(*(z._async_array._get_selection(s, prototype=zarr.core.buffer.default_buffer_prototype()) for s in slices))).reshape(chunk_size * len(chunks))

assert (z[np_indexer] == zarr.core.sync.sync(run())).all()

clear_cache()
%timeit  z[np_indexer]
clear_cache()
%timeit zarr.core.sync.sync(run())
clear_cache()
%timeit zarr.core.sync.sync(z._async_array._get_selection(MultiBasicIndexer(slices), prototype=zarr.core.buffer.default_buffer_prototype()))

I'm uploading a juv runnable notebook as well with cache clearing built in:
indexing_benchmark.ipynb.zip

Image

So I see about a 40% speedup for the batched indexing vs. asyncio.gather separated indexing (both of which are light years ahead of integer indexing). This is an upper bound - usually, the difference was ~20-30%, but zarrs gives another 20-30% which means we're now close to a 2X speedup.

Thus I see a potential route forward here as making the following public

  • AsyncArray._get_selection (or something similar, just saying this because it exists)
  • Indexer abc class + ChunkProjection for iteration
  • An into_indexer function that returns Indexer-based classes for a simple selection (like tuple of slices or so). I think the Indexer return type would probably be enough to do useful things since the iterator for all Indexer subclasses returns ChunkProjection, which can be modified as needed (see my motivating example).

Open to other thoughts!

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew features or improvements

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions