Skip to content

Ensure BufferBlocks are completed and empty in RowShufflingTransformer. #4479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,24 @@ protected override void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing && _producerTask.Status == TaskStatus.Running)

if (disposing)
{
_toProduce.Post(0);
_toProduce.Complete();
_producerTask.Wait();

// Complete the consumer after the producerTask has finished, since producerTask could
// have posted more items to _toConsume.
_toConsume.Complete();

// Drain both BufferBlocks - this prevents what appears to be memory leaks when using the VS Debugger
// because if a BufferBlock still contains items, its underlying Tasks are not getting completed.
// See https://github.com/dotnet/corefx/issues/30582 for the VS Debugger issue.
// See also https://github.com/dotnet/machinelearning/issues/4399
_toProduce.TryReceiveAll(out _);
_toConsume.TryReceiveAll(out _);
}

_disposed = true;
base.Dispose(disposing);
}
Expand All @@ -578,15 +591,16 @@ private async Task LoopProducerWorker()
try
{
int circularIndex = 0;
for (; ; )
while (await _toProduce.OutputAvailableAsync().ConfigureAwait(false))
{
int requested = await _toProduce.ReceiveAsync();
if (requested == 0)
int requested;
if (!_toProduce.TryReceive(out requested))
{
// We had some sort of early exit. Just go out, do not post even the
// sentinel to the consumer, as nothing will be consumed any more.
return;
// OutputAvailableAsync returned true, but TryReceive returned false -
// so loop back around and try again.
continue;
}

Ch.Assert(requested >= _blockSize);
int numRows;
for (numRows = 0; numRows < requested; ++numRows)
Expand Down