Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: goadesign/pulse
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.2.0
Choose a base ref
...
head repository: goadesign/pulse
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.3.0
Choose a head ref
  • 1 commit
  • 10 files changed
  • 1 contributor

Commits on Feb 19, 2025

  1. pool: Enhance worker lifecycle management and job reliability (#51)

    * Filter out stale events early
    
    Ensure two nodes can't process stale workers concurrently
    
    * Centralize worker stream cache
    
    Instead of each worker having their own cache,
    have the parent node hold the cache to avoid
    duplicate caching.
    
    * pool: fix cleanup of stale workers
    
    The original code used future timestamps in workerKeepAliveMap to prevent
    concurrent cleanup operations. This made stale workers appear active and
    could permanently prevent cleanup if a node crashed during the process.
    
    Fixed by:
    - Added dedicated cleanupMap to track workers being cleaned up
    - Implemented proper concurrency handling using SetIfNotExists/TestAndSet
    - Added retry logic with exponential backoff for requeuing jobs
    - Ensured cleanup map is properly closed during node shutdown
    - Updated worker.go to handle new processRequeuedJobs retry parameter
    
    The fix ensures stale workers and their jobs are reliably cleaned up even
    in case of node failures or concurrent cleanup attempts.
    
    * Properly ack stale event to avoid re-publish
    
    * fix(pool): prevent worker leaks from context cancellation
    
    - Use background context for worker goroutines to prevent premature termination
    - Preserve logging context while making worker lifecycle independent of caller
    - Rename maps for better clarity (e.g. jobsMap -> jobMap)
    - Improve node stream management with nodeStreams map
    - Clean up error handling and logging patterns
    
    This fixes an issue where workers could be leaked when the caller's context
    was cancelled before proper cleanup could occur.
    
    * refactor(pool): Ensure eventual consistency of job assignments
    
    Improves the Node component's ability to detect and reassign jobs from stale
    or deleted workers by:
    1. Adding explicit orphaned job detection for workers missing keep-alive entries
    2. Centralizing worker cleanup logic to ensure consistent job reassignment
    3. Simplifying worker state validation to catch edge cases in distributed scenarios
    
    This ensures that no jobs are lost when workers become unavailable, maintaining
    eventual consistency of job assignments across the worker pool.
    
    * pool: improve worker cleanup and job requeuing reliability
    
    Enhances worker cleanup mechanism by handling stale cleanup locks and
    adding cleanup verification. Key changes:
    
    * Add detection and cleanup of stale worker cleanup locks
    * Clean up jobs from jobMap after successful requeue
    * Improve logging around worker cleanup and job requeuing
    * Upgrade requeue log level to Info for better operational visibility
    
    This improves reliability of the distributed job system by preventing
    orphaned jobs and stale locks from accumulating over time.
    
    * Prevent streams from being recreated after destroy
    
    * Prevent re-creation of worker streams
    
    * Add proper options to missed node stream add call
    
    * streaming: cleanup stale consumers during sink initialization
    
    Add cleanup of stale consumers during sink initialization to prevent accumulation
    of stale consumers in Redis. Previously stale consumers were only cleaned up
    periodically, which could lead to a buildup if sinks did not shut down cleanly.
    
    Also refactor the stale consumer cleanup logic to:
    1. Extract common cleanup code into deleteStreamStaleConsumers
    2. Improve error handling and logging
    3. Properly clean up all related data structures (Redis consumer group,
       keep-alive map, and consumers map)
    
    * Add event ID to requeue log entries
    
    * pool: refactor worker cleanup logic
    
    Improve the worker cleanup implementation by:
    1. Split cleanupWorker into smaller, focused functions:
       - acquireCleanupLock: handles cleanup lock management
       - requeueWorkerJobs: handles job requeuing
       - cleanupWorker: orchestrates the cleanup process
    
    2. Simplify cleanupInactiveWorkers:
       - Use activeWorkers() to get list of active workers
       - Combine jobMap and workerMap checks into a single loop
       - Skip workers being actively cleaned up
    
    3. Rename isActive to isWithinTTL to better reflect its purpose
       - Function checks if a timestamp is within TTL duration
       - Used consistently across node and worker cleanup
    
    * pool: Add periodic cleanup of stale pending jobs
    
    This commit adds a new background process to clean up stale entries in the
    pending jobs map. Previously, stale entries were only cleaned up when
    attempting to dispatch a job with the same key. Now, a dedicated goroutine
    runs at the ackGracePeriod frequency to proactively remove expired entries.
    
    Additional changes:
    - Fix jobPendingMap comment to clarify it's indexed by job key not worker ID
    - Add debug logs for worker shutdown in handleEvents and keepAlive
    - Refactor timestamp validation to use isWithinTTL helper across the codebase
    - Improve error handling in cleanupStalePendingJobs using TestAndDelete
    
    The periodic cleanup helps prevent memory leaks from abandoned dispatch
    attempts and makes the job dispatch system more reliable.
    
    * pool: simplify job requeuing logic
    
    Remove the requeueJob helper function and directly use dispatchJob for
    requeueing jobs during worker cleanup and rebalancing.
    
    * Properly delete stale workers with requeued jobs
    
    * Fix deadlock from blocked ichan notifications
    
    Non-read notifications on the ichan channel were blocking writes and causing deadlocks during Set operations. This commit removes ichan and replaces it with a waiter-based mechanism using dedicated per-call channels, ensuring notifications are delivered without blocking.
    raphael authored Feb 19, 2025
    Configuration menu
    Copy the full SHA
    2598358 View commit details
    Browse the repository at this point in the history
Loading