Skip to content

Commit 168f195

Browse files
committed
SERVER-30722 Make changestreams require FCV 3.6
This reverts commit 883c6c2.
1 parent 5589e9d commit 168f195

File tree

5 files changed

+85
-5
lines changed

5 files changed

+85
-5
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Test that $changeStreams usage is disallowed when the featureCompatibilityVersion is 3.4.
2+
(function() {
3+
"use strict";
4+
5+
const rst = new ReplSetTest({nodes: 1});
6+
rst.startSet();
7+
rst.initiate();
8+
const conn = rst.getPrimary();
9+
10+
const testDB = conn.getDB("changeStreams_feature_compatibility_version");
11+
const coll = testDB.coll;
12+
assert.commandWorked(testDB.createCollection(coll.getName()));
13+
14+
// Make sure $changeStreams works with (default) featureCompatibilityVersion 3.6
15+
16+
assert.commandWorked(testDB.runCommand(
17+
{aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}}));
18+
19+
// $changeStreams is not permitted when the featureCompatibilityVersion is 3.4.
20+
21+
assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "3.4"}));
22+
assert.commandFailedWithCode(
23+
testDB.runCommand({aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}}),
24+
ErrorCodes.InvalidOptions);
25+
26+
rst.stopSet();
27+
})();

jstests/noPassthrough/unsupported_change_stream_deployments.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
const masterSlaveFixture = new ReplTest("change_stream");
2222
const master = masterSlaveFixture.start(true);
2323
assertChangeStreamNotSupportedOnConnection(master);
24-
const slave = masterSlaveFixture.start(false);
25-
assertChangeStreamNotSupportedOnConnection(slave);
24+
// Slaves start in the wrong FCV, (SERVER-31218) resulting in the wrong error code.
25+
// const slave = masterSlaveFixture.start(false);
26+
// assertChangeStreamNotSupportedOnConnection(slave);
2627

2728
// Test a sharded cluster with standalone shards.
2829
const clusterWithStandalones = new ShardingTest({shards: 2});

src/mongo/db/pipeline/document_source_change_stream.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "mongo/bson/simple_bsonelement_comparator.h"
3434
#include "mongo/db/bson/bson_helper.h"
3535
#include "mongo/db/catalog/uuid_catalog.h"
36+
#include "mongo/db/commands/feature_compatibility_version_command_parser.h"
3637
#include "mongo/db/pipeline/close_change_stream_exception.h"
3738
#include "mongo/db/pipeline/document_source_check_resume_token.h"
3839
#include "mongo/db/pipeline/document_source_limit.h"
@@ -231,6 +232,14 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(const NamespaceString& nss,
231232

232233
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
233234
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
235+
uassert(
236+
ErrorCodes::InvalidOptions,
237+
str::stream()
238+
<< "The featureCompatibilityVersion must be 3.6 to use the $changeStream stage. See "
239+
<< feature_compatibility_version::kDochubLink
240+
<< ".",
241+
serverGlobalParams.featureCompatibility.version.load() !=
242+
ServerGlobalParams::FeatureCompatibility::Version::k34);
234243
// TODO: Add sharding support here (SERVER-29141).
235244
uassert(
236245
40470, "The $changeStream stage is not supported on sharded systems.", !expCtx->inMongos);

src/mongo/db/pipeline/document_source_change_stream_test.cpp

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,34 @@ static const Timestamp ts(100, 1);
6868
static const repl::OpTime optime(ts, 1);
6969
static const NamespaceString nss("unittests.change_stream");
7070

71-
using ChangeStreamStageTestNoSetup = AggregationContextFixture;
71+
class EnsureFCV {
72+
public:
73+
using Version = ServerGlobalParams::FeatureCompatibility::Version;
74+
EnsureFCV(Version version)
75+
: _origVersion(serverGlobalParams.featureCompatibility.version.load()) {
76+
serverGlobalParams.featureCompatibility.version.store(version);
77+
}
78+
~EnsureFCV() {
79+
serverGlobalParams.featureCompatibility.version.store(_origVersion);
80+
}
81+
82+
private:
83+
const Version _origVersion;
84+
};
85+
86+
class ChangeStreamStageTestNoSetup : public AggregationContextFixture {
87+
public:
88+
ChangeStreamStageTestNoSetup() : ChangeStreamStageTestNoSetup(nss) {}
89+
ChangeStreamStageTestNoSetup(NamespaceString nsString)
90+
: AggregationContextFixture(nsString), _ensureFCV(EnsureFCV::Version::k36) {}
91+
92+
private:
93+
EnsureFCV _ensureFCV;
94+
};
7295

73-
class ChangeStreamStageTest : public AggregationContextFixture {
96+
class ChangeStreamStageTest : public ChangeStreamStageTestNoSetup {
7497
public:
75-
ChangeStreamStageTest() : AggregationContextFixture(nss) {
98+
ChangeStreamStageTest() : ChangeStreamStageTestNoSetup() {
7699
repl::ReplicationCoordinator::set(getExpCtx()->opCtx->getServiceContext(),
77100
stdx::make_unique<repl::ReplicationCoordinatorMock>(
78101
getExpCtx()->opCtx->getServiceContext()));

src/mongo/db/pipeline/pipeline_test.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,21 @@ void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) {
6565
opCtx->getServiceContext(),
6666
stdx::make_unique<repl::ReplicationCoordinatorMock>(opCtx->getServiceContext()));
6767
}
68+
69+
class EnsureFCV {
70+
public:
71+
using Version = ServerGlobalParams::FeatureCompatibility::Version;
72+
EnsureFCV(Version version)
73+
: _origVersion(serverGlobalParams.featureCompatibility.version.load()) {
74+
serverGlobalParams.featureCompatibility.version.store(version);
75+
}
76+
~EnsureFCV() {
77+
serverGlobalParams.featureCompatibility.version.store(_origVersion);
78+
}
79+
80+
private:
81+
const Version _origVersion;
82+
};
6883
} // namespace
6984

7085
namespace Optimizations {
@@ -979,6 +994,7 @@ TEST(PipelineOptimizationTest, MatchOnMaxLengthShouldMoveAcrossRename) {
979994
}
980995

981996
TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
997+
EnsureFCV ensureFCV(EnsureFCV::Version::k36);
982998
QueryTestServiceContext testServiceContext;
983999
auto opCtx = testServiceContext.makeOperationContext();
9841000

@@ -1004,6 +1020,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
10041020
}
10051021

10061022
TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage) {
1023+
EnsureFCV ensureFCV(EnsureFCV::Version::k36);
10071024
QueryTestServiceContext testServiceContext;
10081025
auto opCtx = testServiceContext.makeOperationContext();
10091026

@@ -1482,6 +1499,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles
14821499
}
14831500

14841501
TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsValidAsFirstStage) {
1502+
EnsureFCV ensureFCV(EnsureFCV::Version::k36);
14851503
const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}")};
14861504
auto ctx = getExpCtx();
14871505
setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
@@ -1490,6 +1508,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsValidAsFirstStage) {
14901508
}
14911509

14921510
TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStage) {
1511+
EnsureFCV ensureFCV(EnsureFCV::Version::k36);
14931512
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
14941513
fromjson("{$changeStream: {}}")};
14951514
auto ctx = getExpCtx();
@@ -1500,6 +1519,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStage) {
15001519
}
15011520

15021521
TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStageInFacet) {
1522+
EnsureFCV ensureFCV(EnsureFCV::Version::k36);
15031523
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
15041524
fromjson("{$changeStream: {}}")};
15051525
auto ctx = getExpCtx();

0 commit comments

Comments
 (0)