Skip to content

fix: single task executor getting all tasks from Redis queue #7330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

wanpdsantos
Copy link
Contributor

What problem does this PR solve?

Currently, as long as there are tasks in Redis, this loop will keep getting the tasks. This will lead to a single task executor with many tasks in the pending state. Then we need to wait for the pending tasks to get them back in the queue.

In first place, if we set the MAX_CONCURRENT_TASKS to X, then only X tasks should be picked from the queue, and others should be left in the queue for other task_executors or be picked after 1 of the spots in the current executor gets free. This PR ensures this behavior.

The additional changes were due to the Ruff linting in pre-commit. But I believe these are expected to keep the coding style.

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. 🐞 bug Something isn't working, pull request that fix bug. labels Apr 26, 2025
@yuzhichang
Copy link
Member

trio.Semaphore is better here. Be careful to ensure handle_task release semaphore finally.

import trio
import random

task_limiter = trio.Semaphore(3)


async def handle_task(id: int):
    try:
        print(f"{trio.current_time()}: Worker {id} started")
        await trio.sleep(random.randint(1, 5))
        print(f"{trio.current_time()}: Worker {id} done")
    finally:
        task_limiter.release()


async def handle_tasks():
    id = 0
    async with trio.open_nursery() as nursery:
        while True:
            await task_limiter.acquire()
            nursery.start_soon(handle_task, id)
            id += 1


trio.run(handle_tasks)

@yuzhichang yuzhichang force-pushed the fix-task-executor-concurrency-with-redis-behavior branch from 8f86a2d to 98ca149 Compare June 6, 2025 06:10
@dosubot dosubot bot added size:XS This PR changes 0-9 lines, ignoring generated files. and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Jun 6, 2025
@yuzhichang yuzhichang added the ci Continue Integration label Jun 6, 2025
@yuzhichang
Copy link
Member

PR #7700 creates task_manager every 0.1s. This can be fixed by acquiring semaphore before creating task_manager.

@yuzhichang yuzhichang merged commit 0e03542 into infiniflow:main Jun 6, 2025
3 checks passed
@yuzhichang yuzhichang mentioned this pull request Jun 6, 2025
1 task
yuzhichang added a commit that referenced this pull request Jun 6, 2025
### What problem does this PR solve?

Back port PR #7330  to 0.19 branch

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐞 bug Something isn't working, pull request that fix bug. ci Continue Integration size:XS This PR changes 0-9 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants