Skip to content

Commit 525f2f1

Browse files
author
Hari Khalsa
committed
SERVER-10026 SERVER-10461 runner registry: short-term safe yielding for non-cached runners
1 parent 554b31c commit 525f2f1

18 files changed

+682
-132
lines changed

src/mongo/db/clientcursor.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ namespace mongo {
4848
ClientCursor::CCById ClientCursor::clientCursorsById;
4949
boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) );
5050
long long ClientCursor::numberTimedOut = 0;
51+
set<Runner*> ClientCursor::nonCachedRunners;
5152

5253
void aboutToDeleteForSharding(const StringData& ns,
5354
const Database* db,
@@ -140,6 +141,14 @@ namespace mongo {
140141
verify(db);
141142
verify(str::startsWith(ns, db->name()));
142143

144+
for (set<Runner*>::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end(); ++it) {
145+
Runner* runner = *it;
146+
const char* runnerNS = runner->getQuery().getParsed().ns();
147+
if ((isDB && str::startsWith(runnerNS, ns)) || (str::equals(runnerNS, ns))) {
148+
runner->kill();
149+
}
150+
}
151+
143152
recursive_scoped_lock cclock(ccmutex);
144153
CCById::const_iterator it = clientCursorsById.begin();
145154
while (it != clientCursorsById.end()) {
@@ -203,6 +212,14 @@ namespace mongo {
203212

204213
aboutToDeleteForSharding( ns, db, nsd, dl );
205214

215+
// Check our non-cached active runner list.
216+
for (set<Runner*>::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end(); ++it) {
217+
Runner* runner = *it;
218+
if (0 == ns.compare(runner->getQuery().getParsed().ns())) {
219+
runner->invalidate(dl);
220+
}
221+
}
222+
206223
// TODO: This requires optimization. We walk through *all* CCs and send the delete to every
207224
// CC open on the db we're deleting from. We could:
208225
// 1. Map from ns to open runners,
@@ -295,6 +312,18 @@ namespace mongo {
295312
// End cursor-only
296313
}
297314

315+
void ClientCursor::registerRunner(Runner* runner) {
316+
recursive_scoped_lock lock(ccmutex);
317+
verify(nonCachedRunners.end() == nonCachedRunners.find(runner));
318+
nonCachedRunners.insert(runner);
319+
}
320+
321+
void ClientCursor::deregisterRunner(Runner* runner) {
322+
recursive_scoped_lock lock(ccmutex);
323+
verify(nonCachedRunners.end() != nonCachedRunners.find(runner));
324+
nonCachedRunners.erase(runner);
325+
}
326+
298327
void yieldOrSleepFor1Microsecond() {
299328
#ifdef _WIN32
300329
SwitchToThread();

src/mongo/db/clientcursor.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,23 @@ namespace mongo {
8585
const NamespaceDetails* nsd,
8686
const DiskLoc& dl);
8787

88+
/**
89+
* Register a runner so that it can be notified of deletion/invalidation during yields.
90+
* Must be called before a runner yields. If a runner is cached (inside a ClientCursor) it
91+
* MUST NOT be registered; the two are mutually exclusive.
92+
*/
93+
static void registerRunner(Runner* runner);
94+
95+
/**
96+
* Remove a runner from the runner registry.
97+
*/
98+
static void deregisterRunner(Runner* runner);
99+
88100
//
89101
// Yielding.
90102
//
91103

92-
static void staticYield(int micros, const StringData& ns, Record* rec );
104+
static void staticYield(int micros, const StringData& ns, Record* rec);
93105

94106
//
95107
// Static methods about all ClientCursors TODO: Document.
@@ -253,6 +265,13 @@ namespace mongo {
253265
typedef map<CursorId, ClientCursor*> CCById;
254266
static CCById clientCursorsById;
255267

268+
// A list of NON-CACHED runners. Any runner that yields must be put into this map before
269+
// yielding in order to be notified of invalidation and namespace deletion. Before the
270+
// runner is deleted, it must be removed from this map.
271+
//
272+
// TODO: This is temporary and as such is highly NOT optimized.
273+
static set<Runner*> nonCachedRunners;
274+
256275
// How many cursors have timed out?
257276
static long long numberTimedOut;
258277

src/mongo/db/exec/stagedebug_cmd.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
#include "mongo/db/matcher/expression_parser.h"
3434
#include "mongo/db/namespace_details.h"
3535
#include "mongo/db/pdfile.h"
36-
#include "mongo/db/query/simple_plan_runner.h"
36+
#include "mongo/db/query/plan_executor.h"
3737

3838
namespace mongo {
3939

@@ -96,14 +96,14 @@ namespace mongo {
9696
BSONObj argObj = argElt.Obj();
9797

9898
OwnedPointerVector<MatchExpression> exprs;
99-
SimplePlanRunner runner;
99+
PlanExecutor runner;
100100
auto_ptr<PlanStage> root(parseQuery(dbname, argObj, runner.getWorkingSet(), &exprs));
101101
uassert(16911, "Couldn't parse plan from " + argObj.toString(), root.get());
102102
runner.setRoot(root.release());
103103

104104
BSONArrayBuilder resultBuilder(result.subarrayStart("results"));
105105

106-
for (BSONObj obj; runner.getNext(&obj); ) {
106+
for (BSONObj obj; Runner::RUNNER_ADVANCED == runner.getNext(&obj); ) {
107107
resultBuilder.append(obj);
108108
}
109109

src/mongo/db/query/cached_plan_runner.h

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
#pragma once
1818

19+
#include "mongo/db/parsed_query.h"
1920
#include "mongo/db/query/canonical_query.h"
2021
#include "mongo/db/query/plan_cache.h"
22+
#include "mongo/db/query/plan_executor.h"
2123
#include "mongo/db/query/runner.h"
22-
#include "mongo/db/query/simple_plan_runner.h"
2324
#include "mongo/db/query/stage_builder.h"
2425

2526
namespace mongo {
@@ -40,53 +41,80 @@ namespace mongo {
4041
CachedPlanRunner(CanonicalQuery* canonicalQuery, CachedSolution* cached,
4142
PlanStage* root, WorkingSet* ws)
4243
: _canonicalQuery(canonicalQuery), _cachedQuery(cached),
43-
_runner(new SimplePlanRunner(ws, root)) { }
44+
_exec(new PlanExecutor(ws, root)), _killed(false), _updatedCache(false) { }
4445

45-
bool getNext(BSONObj* objOut) {
46-
// Use the underlying runner until it's exhausted.
47-
if (_runner->getNext(objOut)) {
48-
return true;
49-
}
46+
Runner::RunnerState getNext(BSONObj* objOut) {
47+
if (_killed) { return Runner::RUNNER_DEAD; }
48+
49+
Runner::RunnerState state = _exec->getNext(objOut);
5050

51-
// We're done. Update the cache.
52-
PlanCache* cache = PlanCache::get(_canonicalQuery->ns());
51+
if (Runner::RUNNER_EOF == state && !_updatedCache) {
52+
// We're done. Update the cache.
53+
PlanCache* cache = PlanCache::get(_canonicalQuery->ns());
5354

54-
// TODO: is this a verify?
55-
if (NULL == cache) { return false; }
55+
// TODO: Is this an error?
56+
if (NULL == cache) { return Runner::RUNNER_EOF; }
5657

57-
// TODO: How do we decide this?
58-
bool shouldRemovePlan = false;
58+
// TODO: How do we decide this?
59+
bool shouldRemovePlan = false;
5960

60-
if (shouldRemovePlan) {
61-
if (!cache->remove(*_canonicalQuery, *_cachedQuery->solution)) {
62-
warning() << "Cached plan runner couldn't remove plan from cache. Maybe"
63-
" somebody else did already?";
61+
if (shouldRemovePlan) {
62+
if (!cache->remove(*_canonicalQuery, *_cachedQuery->solution)) {
63+
warning() << "Cached plan runner couldn't remove plan from cache. Maybe"
64+
" somebody else did already?";
65+
}
66+
return Runner::RUNNER_EOF;
6467
}
65-
return false;
68+
69+
// We're done running. Update cache.
70+
auto_ptr<CachedSolutionFeedback> feedback(new CachedSolutionFeedback());
71+
feedback->stats = _exec->getStats();
72+
cache->feedback(*_canonicalQuery, *_cachedQuery->solution, feedback.release());
6673
}
74+
return state;
75+
}
6776

68-
// We're done running. Update cache.
69-
auto_ptr<CachedSolutionFeedback> feedback(new CachedSolutionFeedback());
70-
feedback->stats = _runner->getStats();
71-
cache->feedback(*_canonicalQuery, *_cachedQuery->solution, feedback.release());
72-
return false;
77+
virtual void saveState() {
78+
if (!_killed) {
79+
_exec->saveState();
80+
}
7381
}
7482

75-
virtual void saveState() { _runner->saveState(); }
76-
virtual void restoreState() { _runner->restoreState(); }
83+
virtual void restoreState() {
84+
if (!_killed) {
85+
_exec->restoreState();
86+
}
87+
}
7788

7889
virtual void invalidate(const DiskLoc& dl) {
79-
_runner->invalidate(dl);
90+
if (!_killed) {
91+
_exec->invalidate(dl);
92+
}
8093
}
8194

82-
virtual const CanonicalQuery& getQuery() {
83-
return *_canonicalQuery;
95+
virtual const CanonicalQuery& getQuery() { return *_canonicalQuery; }
96+
97+
virtual void kill() { _killed = true; }
98+
99+
virtual bool forceYield() {
100+
saveState();
101+
ClientCursor::registerRunner(this);
102+
ClientCursor::staticYield(ClientCursor::suggestYieldMicros(), getQuery().getParsed().ns(), NULL);
103+
ClientCursor::deregisterRunner(this);
104+
if (!_killed) { restoreState(); }
105+
return !_killed;
84106
}
85107

86108
private:
87109
scoped_ptr<CanonicalQuery> _canonicalQuery;
88110
scoped_ptr<CachedSolution> _cachedQuery;
89-
scoped_ptr<SimplePlanRunner> _runner;
111+
scoped_ptr<PlanExecutor> _exec;
112+
113+
// Were we killed during a yield?
114+
bool _killed;
115+
116+
// Have we updated the cache with our plan stats yet?
117+
bool _updatedCache;
90118
};
91119

92120
} // namespace mongo

src/mongo/db/query/canonical_query.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ namespace mongo {
4949
CanonicalQuery** out) {
5050
auto_ptr<CanonicalQuery> cq(new CanonicalQuery());
5151

52-
cq->_pq.reset(new ParsedQuery(ns.c_str(), 0, 0, 0, query, BSONObj()));
52+
// ParsedQuery saves the pointer to the NS that we provide it. It's not going to remain
53+
// valid unless we cache it ourselves.
54+
cq->_ns = ns;
55+
56+
cq->_pq.reset(new ParsedQuery(cq->_ns.c_str(), 0, 0, 0, query, BSONObj()));
5357

5458
StatusWithMatchExpression swme = MatchExpressionParser::parse(cq->_pq->getFilter());
5559
if (!swme.isOK()) { return swme.getStatus(); }

src/mongo/db/query/canonical_query.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ namespace mongo {
5252

5353
// _root points into _pq->getFilter()
5454
scoped_ptr<MatchExpression> _root;
55+
56+
string _ns;
5557
};
5658

5759
} // namespace mongo

0 commit comments

Comments
 (0)