Skip to content

Conversation

@henrygerardmoore
Copy link
Collaborator

Should resolve #61

@jlack1987 can you leave a review and confirm if this would provide the use case you'd like? If you want to add a more in-depth test case that would be appreciated, though I understand if it's impossible since it's basically reliant on a specific order of threads queuing things.

One open question is should we also change stopRecording to call stopAcceptingSnapshots? This would require changing the test but IMO is more aligned with the behavior I'd expect. If we don't want to keep the (slightly counterintuitive) existing behavior we could probably make this a cleaner change and remove the forced_stop_recording_ variable from MCAPSink altogether

@jlack1987
Copy link
Contributor

jlack1987 commented Oct 7, 2025

It has definitely helped. I removed my 100ms sleeps in my tests between calling stopRecording and am using your added functions and now my tests are flaky instead of always failing w/o the sleeps. There's some race condition that still exists, bc i'm calling

void McapSink::stopRecording()
{
    stopAcceptingSnapshots();
    processQueuedSnapshots();
    std::scoped_lock lk(mutex_);
    if (writer_)
    {
        writer_->close();
        writer_.reset();
    }
}

and still somehow my sinks storeSnapshot is getting called and i'm seeing the error message from this block

bool McapSink::storeSnapshot(const DataTamer::Snapshot& snapshot)
{
    std::scoped_lock lk(mutex_);
    if (!writer_)
    {
        std::cerr << "[McapSink] storeSnapshot: mcap writer is null, skipping snapshot" << std::endl;
        return false;
    }
}

@henrygerardmoore
Copy link
Collaborator Author

@jlack1987 great thank you, I'll see if I can fix that and push a fix

@henrygerardmoore henrygerardmoore self-assigned this Oct 7, 2025
@henrygerardmoore
Copy link
Collaborator Author

@jlack1987 are you using a custom mcap sink? Is there anyway I can see that? If not, does the MCAPSink::finishQueueAndStop() function that I added in the MCAPSink work for you?

So the only way to stop storeSnapshot getting called is to call DataSinkBase::stopThread(), which I didn't want to do because there's no way to restart it at the moment from a child class (perhaps this should change). So this thread:

thread = std::thread([this, self]() {
      Snapshot snapshot_copy;
      while(run)
      {
        while(queue.try_dequeue(snapshot_copy))
        {
          self->storeSnapshot(snapshot_copy);
        }
        // avoid busy loop
        std::this_thread::sleep_for(std::chrono::microseconds(250));
      }
    });

Is actually still going, since nothing ever sets the pimpl's run variable to false. And the reason there is a snapshot_copy for self->storeSnapshot to be called on is a race condition that's a little unavoidable. That's why in the mcap sink I did

void MCAPSink::finishQueueAndStop()
{
  // stop accepting new snapshots
  stopAcceptingSnapshots();

  // finish any that are queued
  processQueuedSnapshots();

  // sleep and process any that were missed by previous processing
  std::this_thread::sleep_for(std::chrono::microseconds(250));
  processQueuedSnapshots();

  // now stop the recording as normal
  stopRecording();
}

With that sleep there before a second call to processQueuedSnapshots(). The docstring for try_dequeue says

Attempts to dequeue from the queue.
Returns false if all producer streams appeared empty at the time they were checked (so, the queue is likely but not guaranteed to be empty).
Never allocates. Thread-safe.

So unfortunately I think some sleep is necessary to get any stragglers that may have still been enqueueing during the first processQueuedSnapshots

Is there something stopping you from using the built-in mcap sink? Let me know if this all makes sense or if I've missed something

@jlack1987
Copy link
Contributor

Yeah it's a custom sink, you can see the header here and src here. I'm not really sure how i'm getting the behavior i'm getting. I think you may be right that I need to add a sleep that matches the sleep in the constructor to try to ensure that everything does get processed?

@jlack1987
Copy link
Contributor

jlack1987 commented Oct 10, 2025

If I use exactly the finishQueueAndStop that you have there then everything seems to work reliably with my tests so I think that just points to you being correct about the thread stuff that gets kicked off in the DataSinkBase::Pimpl constructor

@henrygerardmoore
Copy link
Collaborator Author

@jlack1987 ok, great! Glad to hear. Will merging this as-is resolve #61, then?

@jlack1987
Copy link
Contributor

Yeah I think we can call it solved, appreciate the support sir!

@henrygerardmoore
Copy link
Collaborator Author

@jlack1987 of course, glad I could help! I might leave this PR open a bit longer and re-review it myself and add some tests this weekend, but at the moment I don't foresee any functional changes so you should be good to use this branch until it's merged to main.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Need a way of allowing queued snapshots to process when stopRecording is called

2 participants