@@ -64,6 +64,9 @@ Status ScatterGatherRunner::run() {
6464StatusWith<EventHandle> ScatterGatherRunner::start () {
6565 // Callback has a shared pointer to the RunnerImpl, so it's always safe to
6666 // access the RunnerImpl.
67+ // Note: this creates a cycle of shared_ptr:
68+ // RunnerImpl -> Callback in _callbacks -> RunnerImpl
69+ // We must remove callbacks after using them, to break this cycle.
6770 std::shared_ptr<RunnerImpl>& impl = _impl;
6871 auto cb = [impl](const RemoteCommandCallbackArgs& cbData) { impl->processResponse (cbData); };
6972 return _impl->start (cb);
@@ -122,21 +125,28 @@ void ScatterGatherRunner::RunnerImpl::cancel() {
122125
123126void ScatterGatherRunner::RunnerImpl::processResponse (
124127 const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) {
125- if (cbData.response .getStatus () == ErrorCodes::CallbackCanceled) {
126- return ;
127- }
128128 LockGuard lk (_mutex);
129+
129130 if (!_sufficientResponsesReceived.isValid ()) {
130131 // We've received sufficient responses and it's not safe to access the algorithm any more.
131132 return ;
132133 }
133134
134- ++_actualResponses;
135+ // Remove the callback from our vector to break the cycle of shared_ptr.
136+ auto iter = std::find (_callbacks.begin (), _callbacks.end (), cbData.myHandle );
137+ invariant (iter != _callbacks.end ());
138+ std::swap (*iter, _callbacks.back ());
139+ _callbacks.pop_back ();
140+
141+ if (cbData.response .getStatus () == ErrorCodes::CallbackCanceled) {
142+ return ;
143+ }
144+
135145 _algorithm->processResponse (cbData.request , cbData.response );
136146 if (_algorithm->hasReceivedSufficientResponses ()) {
137147 _signalSufficientResponsesReceived ();
138148 } else {
139- invariant (_actualResponses < _callbacks.size ());
149+ invariant (! _callbacks.empty ());
140150 }
141151}
142152
@@ -145,6 +155,8 @@ void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() {
145155 std::for_each (_callbacks.begin (),
146156 _callbacks.end (),
147157 stdx::bind (&ReplicationExecutor::cancel, _executor, stdx::placeholders::_1));
158+ // Clear _callbacks to break the cycle of shared_ptr.
159+ _callbacks.clear ();
148160 _executor->signalEvent (_sufficientResponsesReceived);
149161 _sufficientResponsesReceived = EventHandle ();
150162 }
0 commit comments