Skip to content

Commit c95a630

Browse files
committed
SERVER-38225 Add constant time record count to biggie SE
1 parent 3f92356 commit c95a630

File tree

6 files changed

+207
-24
lines changed

6 files changed

+207
-24
lines changed

src/mongo/db/storage/SConscript

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ env.Library(
162162
env.Library(
163163
target='record_store_test_harness',
164164
source=[
165+
'record_store_test_capped_delete.cpp',
165166
'record_store_test_capped_visibility.cpp',
166167
'record_store_test_datafor.cpp',
167168
'record_store_test_datasize.cpp',

src/mongo/db/storage/biggie/biggie_record_store.cpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,7 @@ long long RecordStore::dataSize(OperationContext* opCtx) const {
118118

119119

120120
long long RecordStore::numRecords(OperationContext* opCtx) const {
121-
StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead());
122-
return workingCopy->distance(workingCopy->lower_bound(_prefix),
123-
workingCopy->upper_bound(_postfix));
121+
return static_cast<long long>(_numRecords.load());
124122
}
125123

126124
bool RecordStore::isCapped() const {
@@ -152,7 +150,11 @@ void RecordStore::deleteRecord(OperationContext* opCtx, const RecordId& dl) {
152150
StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead());
153151
auto numElementsRemoved = workingCopy->erase(createKey(_ident, dl.repr()));
154152
invariant(numElementsRemoved == 1);
155-
RecoveryUnit::get(opCtx)->makeDirty();
153+
_numRecords.fetchAndSubtract(numElementsRemoved);
154+
auto ru = RecoveryUnit::get(opCtx);
155+
ru->onRollback(
156+
[numElementsRemoved, this]() { this->_numRecords.fetchAndAdd(numElementsRemoved); });
157+
ru->makeDirty();
156158
}
157159

158160
Status RecordStore::insertRecords(OperationContext* opCtx,
@@ -166,14 +168,18 @@ Status RecordStore::insertRecords(OperationContext* opCtx,
166168
if (_isCapped && totalSize > _cappedMaxSize)
167169
return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
168170

169-
StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead());
171+
auto ru = RecoveryUnit::get(opCtx);
172+
StringStore* workingCopy(ru->getHead());
170173
for (auto& record : *inOutRecords) {
171174
int64_t thisRecordId = nextRecordId();
172175
workingCopy->insert(StringStore::value_type{
173176
createKey(_ident, thisRecordId), std::string(record.data.data(), record.data.size())});
174177
record.id = RecordId(thisRecordId);
175-
RecoveryUnit::get(opCtx)->makeDirty();
178+
ru->makeDirty();
176179
}
180+
auto numInserted = inOutRecords->size();
181+
_numRecords.fetchAndAdd(numInserted);
182+
ru->onRollback([numInserted, this]() { this->_numRecords.fetchAndSubtract(numInserted); });
177183

178184
cappedDeleteAsNeeded(opCtx, workingCopy);
179185
return Status::OK();
@@ -192,7 +198,8 @@ Status RecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
192198
if (_isCapped && totalSize > _cappedMaxSize)
193199
return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
194200

195-
StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead());
201+
auto ru = RecoveryUnit::get(opCtx);
202+
StringStore* workingCopy(ru->getHead());
196203
for (size_t i = 0; i < nDocs; i++) {
197204
const size_t len = docs[i]->documentSize();
198205

@@ -204,8 +211,11 @@ Status RecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
204211
workingCopy->insert(std::move(vt));
205212
if (idsOut)
206213
idsOut[i] = RecordId(thisRecordId);
207-
RecoveryUnit::get(opCtx)->makeDirty();
214+
ru->makeDirty();
208215
}
216+
_numRecords.fetchAndAdd(static_cast<int64_t>(nDocs));
217+
ru->onRollback(
218+
[nDocs, this]() { this->_numRecords.fetchAndSubtract(static_cast<int64_t>(nDocs)); });
209219

210220
cappedDeleteAsNeeded(opCtx, workingCopy);
211221
return Status::OK();
@@ -252,7 +262,10 @@ Status RecordStore::truncate(OperationContext* opCtx) {
252262
if (!s.isOK())
253263
return s.getStatus();
254264

255-
// TODO: SERVER-38225
265+
int64_t numErased = s.getValue();
266+
_numRecords.fetchAndSubtract(numErased);
267+
RecoveryUnit::get(opCtx)->onRollback(
268+
[numErased, this]() { this->_numRecords.fetchAndAdd(numErased); });
256269

257270
return Status::OK();
258271
}
@@ -280,13 +293,15 @@ StatusWith<int64_t> RecordStore::truncateWithoutUpdatingCount(OperationContext*
280293
}
281294

282295
void RecordStore::cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) {
283-
StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead());
296+
auto ru = RecoveryUnit::get(opCtx);
297+
StringStore* workingCopy(ru->getHead());
284298

285299
WriteUnitOfWork wuow(opCtx);
286300
const auto recordKey = createKey(_ident, end.repr());
287301
auto recordIt =
288302
inclusive ? workingCopy->lower_bound(recordKey) : workingCopy->upper_bound(recordKey);
289303
auto endIt = workingCopy->upper_bound(_postfix);
304+
int64_t numErased = 0;
290305

291306
while (recordIt != endIt) {
292307
stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
@@ -300,13 +315,17 @@ void RecordStore::cappedTruncateAfter(OperationContext* opCtx, RecordId end, boo
300315

301316
// Don't need to increment the iterator because the iterator gets revalidated and placed
302317
// on the next item after the erase.
318+
numErased++;
303319
workingCopy->erase(recordIt->first);
304-
RecoveryUnit::get(opCtx)->makeDirty();
305320

306321
// Tree modifications are bound to happen here so we need to reposition our end cursor.
307322
endIt.repositionIfChanged();
323+
ru->makeDirty();
308324
}
309325

326+
_numRecords.fetchAndSubtract(numErased);
327+
ru->onRollback([numErased, this]() { this->_numRecords.fetchAndAdd(numErased); });
328+
310329
wuow.commit();
311330
}
312331

@@ -398,7 +417,10 @@ void RecordStore::cappedDeleteAsNeeded(OperationContext* opCtx, StringStore* wor
398417
// Don't need to increment the iterator because the iterator gets revalidated and placed
399418
// on the next item after the erase.
400419
workingCopy->erase(recordIt->first);
401-
RecoveryUnit::get(opCtx)->makeDirty();
420+
_numRecords.fetchAndSubtract(1);
421+
auto ru = RecoveryUnit::get(opCtx);
422+
ru->onRollback([this]() { this->_numRecords.fetchAndAdd(1); });
423+
ru->makeDirty();
402424
}
403425
}
404426

src/mongo/db/storage/biggie/biggie_record_store.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class RecordStore : public ::mongo::RecordStore {
134134
mutable stdx::mutex _cappedDeleterMutex;
135135

136136
AtomicInt64 _highest_record_id{1};
137+
AtomicInt64 _numRecords{0};
137138
std::string generateKey(const uint8_t* key, size_t key_len) const;
138139

139140
/*

src/mongo/db/storage/biggie/biggie_sorted_impl.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,8 @@ SortedDataBuilderInterface::SortedDataBuilderInterface(OperationContext* opCtx,
182182
_lastRID(-1) {}
183183

184184
SpecialFormatInserted SortedDataBuilderInterface::commit(bool mayInterrupt) {
185-
biggie::RecoveryUnit* ru = checked_cast<biggie::RecoveryUnit*>(_opCtx->recoveryUnit());
186-
ru->beginUnitOfWork(_opCtx);
187-
ru->commitUnitOfWork();
185+
WriteUnitOfWork wunit(_opCtx);
186+
wunit.commit();
188187
return SpecialFormatInserted::NoSpecialFormatInserted;
189188
}
190189

src/mongo/db/storage/biggie/store.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -806,15 +806,6 @@ class RadixStore {
806806
_sizeSubtreeElems = other._sizeSubtreeElems;
807807
}
808808

809-
friend void swap(Node& first, Node& second) {
810-
std::swap(first.trieKey, second.trieKey);
811-
std::swap(first.depth, second.depth);
812-
std::swap(first.data, second.data);
813-
std::swap(first.children, second.children);
814-
std::swap(first._numSubtreeElems, second._numSubtreeElems);
815-
std::swap(first._sizeSubtreeElems, second._sizeSubtreeElems);
816-
}
817-
818809
Node(Node&& other) {
819810
_depth = std::move(other._depth);
820811
_numSubtreeElems = std::move(other._numSubtreeElems);
@@ -824,6 +815,17 @@ class RadixStore {
824815
_children = std::move(other._children);
825816
}
826817

818+
virtual ~Node() = default;
819+
820+
friend void swap(Node& first, Node& second) {
821+
std::swap(first.trieKey, second.trieKey);
822+
std::swap(first.depth, second.depth);
823+
std::swap(first.data, second.data);
824+
std::swap(first.children, second.children);
825+
std::swap(first._numSubtreeElems, second._numSubtreeElems);
826+
std::swap(first._sizeSubtreeElems, second._sizeSubtreeElems);
827+
}
828+
827829
Node& operator=(const Node other) {
828830
swap(*this, other);
829831
return *this;
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
* Copyright (C) 2018-present MongoDB, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*
17+
* As a special exception, the copyright holders give permission to link the
18+
* code of portions of this program with the OpenSSL library under certain
19+
* conditions as described in each individual source file and distribute
20+
* linked combinations including the program with the OpenSSL library. You
21+
* must comply with the Server Side Public License in all respects for
22+
* all of the code used other than as permitted herein. If you modify file(s)
23+
* with this exception, you may extend this exception to your version of the
24+
* file(s), but you are not obligated to do so. If you do not wish to do so,
25+
* delete this exception statement from your version. If you delete this
26+
* exception statement from all source files in the program, then also delete
27+
* it in the license file.
28+
*/
29+
30+
#include "mongo/platform/basic.h"
31+
32+
#include "mongo/db/storage/record_store_test_harness.h"
33+
34+
35+
#include "mongo/db/record_id.h"
36+
#include "mongo/db/storage/record_data.h"
37+
#include "mongo/db/storage/record_store.h"
38+
#include "mongo/unittest/unittest.h"
39+
40+
41+
namespace mongo {
42+
namespace {
43+
44+
using std::string;
45+
using std::stringstream;
46+
using std::unique_ptr;
47+
48+
// Insert a record in a store with capped max docs 1, and try to delete it by inserting another.
49+
TEST(RecordStoreTestHarness, CappedDeleteRecord) {
50+
const auto harness(newRecordStoreHarnessHelper());
51+
if (!harness->supportsDocLocking())
52+
return;
53+
auto rs(harness->newCappedRecordStore(RecordStoreHarnessHelper::kDefaultCapedSizeBytes,
54+
/*cappedMaxDocs*/ 1));
55+
56+
{
57+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
58+
ASSERT_EQUALS(0, rs->numRecords(opCtx.get()));
59+
}
60+
61+
string data = "my record";
62+
RecordId loc1, loc2;
63+
{
64+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
65+
WriteUnitOfWork uow(opCtx.get());
66+
StatusWith<RecordId> res =
67+
rs->insertRecord(opCtx.get(), data.c_str(), data.size() + 1, Timestamp());
68+
ASSERT_OK(res.getStatus());
69+
loc1 = res.getValue();
70+
uow.commit();
71+
}
72+
73+
{
74+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
75+
ASSERT_EQUALS(1, rs->numRecords(opCtx.get()));
76+
}
77+
78+
{
79+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
80+
WriteUnitOfWork uow(opCtx.get());
81+
StatusWith<RecordId> res =
82+
rs->insertRecord(opCtx.get(), data.c_str(), data.size() + 1, Timestamp());
83+
ASSERT_OK(res.getStatus());
84+
loc2 = res.getValue();
85+
ASSERT_GT(loc2, loc1);
86+
uow.commit();
87+
}
88+
89+
{
90+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
91+
ASSERT_EQUALS(1, rs->numRecords(opCtx.get()));
92+
}
93+
}
94+
95+
// Insert multiple records at once, requiring multiple deletes.
96+
TEST(RecordStoreTestHarness, DeleteMultipleRecords) {
97+
const auto harness(newRecordStoreHarnessHelper());
98+
if (!harness->supportsDocLocking())
99+
return;
100+
const int cappedMaxDocs = 10;
101+
auto rs(harness->newCappedRecordStore(RecordStoreHarnessHelper::kDefaultCapedSizeBytes,
102+
cappedMaxDocs));
103+
104+
const int nToInsertFirst = cappedMaxDocs / 2;
105+
const int nToInsertSecond = cappedMaxDocs;
106+
RecordId lastLoc = RecordId();
107+
108+
// First insert some records that fit without exceeding the cap.
109+
{
110+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
111+
WriteUnitOfWork uow(opCtx.get());
112+
for (int i = 0; i < nToInsertFirst; i++) {
113+
stringstream ss;
114+
ss << "record " << i;
115+
string data = ss.str();
116+
117+
StatusWith<RecordId> res =
118+
rs->insertRecord(opCtx.get(), data.c_str(), data.size() + 1, Timestamp());
119+
ASSERT_OK(res.getStatus());
120+
RecordId loc = res.getValue();
121+
ASSERT_GT(loc, lastLoc);
122+
lastLoc = loc;
123+
}
124+
uow.commit();
125+
}
126+
127+
{
128+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
129+
ASSERT_EQUALS(nToInsertFirst, rs->numRecords(opCtx.get()));
130+
}
131+
132+
// Then insert the largest batch possible (number of docs equal to the cap), causing deletes.
133+
{
134+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
135+
WriteUnitOfWork uow(opCtx.get());
136+
for (int i = nToInsertFirst; i < nToInsertFirst + nToInsertSecond; i++) {
137+
stringstream ss;
138+
ss << "record " << i;
139+
string data = ss.str();
140+
141+
StatusWith<RecordId> res =
142+
rs->insertRecord(opCtx.get(), data.c_str(), data.size() + 1, Timestamp());
143+
ASSERT_OK(res.getStatus());
144+
RecordId loc = res.getValue();
145+
ASSERT_GT(loc, lastLoc);
146+
lastLoc = loc;
147+
}
148+
uow.commit();
149+
}
150+
151+
{
152+
ServiceContext::UniqueOperationContext opCtx(harness->newOperationContext());
153+
ASSERT_EQUALS(cappedMaxDocs, rs->numRecords(opCtx.get()));
154+
}
155+
}
156+
157+
} // namespace
158+
} // namespace mongo

0 commit comments

Comments
 (0)