Skip to content

Commit ec3fe6d

Browse files
committed
CSHARP-2428: Support mongos pinning for sharded transactions
1 parent efeda17 commit ec3fe6d

28 files changed

+1670
-42
lines changed

evergreen/run-tests.sh

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@ TOPOLOGY=${TOPOLOGY:-server}
2121

2222
provision_ssl () {
2323
echo "SSL !"
24-
24+
uri_environment_variable_name=$1
2525
# Arguments for auth + SSL
2626
if [ "$AUTH" != "noauth" ] || [ "$TOPOLOGY" == "replica_set" ]; then
27-
export MONGODB_URI="${MONGODB_URI}&ssl=true&sslVerifyCertificate=false"
27+
export $uri_environment_variable_name="${!uri_environment_variable_name}&ssl=true&sslVerifyCertificate=false"
2828
else
29-
export MONGODB_URI="${MONGODB_URI}/?ssl=true&sslVerifyCertificate=false"
29+
export $uri_environment_variable_name="${!uri_environment_variable_name}/?ssl=true&sslVerifyCertificate=false"
3030
fi
3131
}
3232

3333
############################################
3434
# Main Program #
3535
############################################
36-
36+
echo "Initial MongoDB URI:" $MONGODB_URI
3737
# Provision the correct connection string and set up SSL if needed
3838
if [ "$TOPOLOGY" == "sharded_cluster" ]; then
39-
39+
export MONGODB_URI_WITH_MULTIPLE_MONGOSES="${MONGODB_URI}"
4040
if [ "$AUTH" = "auth" ]; then
4141
export MONGODB_URI="mongodb://bob:pwd123@localhost:27017/?authSource=admin"
4242
else
@@ -45,7 +45,10 @@ if [ "$TOPOLOGY" == "sharded_cluster" ]; then
4545
fi
4646

4747
if [ "$SSL" != "nossl" ]; then
48-
provision_ssl
48+
provision_ssl MONGODB_URI
49+
if [ "$TOPOLOGY" == "sharded_cluster" ]; then
50+
provision_ssl MONGODB_URI_WITH_MULTIPLE_MONGOSES
51+
fi
4952
fi
5053

5154
echo "Running $AUTH tests over $SSL for $TOPOLOGY and connecting to $MONGODB_URI"
@@ -56,5 +59,11 @@ else
5659
export TARGET="Test"
5760
fi
5861

59-
for var in TMP TEMP NUGET_PACKAGES NUGET_HTTP_CACHE_PATH APPDATA; do setx $var z:\\data\\tmp; export $var=z:\\data\\tmp; done
62+
echo "Final MongoDB_URI: $MONGODB_URI"
63+
if [ "$TOPOLOGY" == "sharded_cluster" ]; then
64+
echo "Final MongoDB URI with multiple mongoses: $MONGODB_URI_WITH_MULTIPLE_MONGOSES"
65+
fi
66+
for var in TMP TEMP NUGET_PACKAGES NUGET_HTTP_CACHE_PATH APPDATA; do
67+
export $var=z:\\data\\tmp;
68+
done
6069
powershell.exe .\\build.ps1 -target ${TARGET}

src/MongoDB.Driver.Core/Core/Bindings/CoreSession.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using MongoDB.Driver.Core.Clusters;
2121
using MongoDB.Driver.Core.Misc;
2222
using MongoDB.Driver.Core.Operations;
23+
using MongoDB.Driver.Core.Servers;
2324

2425
namespace MongoDB.Driver.Core.Bindings
2526
{
@@ -37,6 +38,7 @@ public sealed class CoreSession : ICoreSession
3738
private bool _isCommitTransactionInProgress;
3839
private readonly IOperationClock _operationClock = new OperationClock();
3940
private readonly CoreSessionOptions _options;
41+
private IServer _pinnedServer;
4042
private readonly ICoreServerSession _serverSession;
4143

4244
// constructors
@@ -110,6 +112,13 @@ public bool IsInTransaction
110112
/// <inheritdoc />
111113
public CoreSessionOptions Options => _options;
112114

115+
/// <inheritdoc />
116+
public IServer PinnedServer
117+
{
118+
get => _pinnedServer;
119+
set => _pinnedServer = value;
120+
}
121+
113122
/// <inheritdoc />
114123
public ICoreServerSession ServerSession => _serverSession;
115124

src/MongoDB.Driver.Core/Core/Bindings/ICoreSession.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using System.Threading.Tasks;
1919
using MongoDB.Bson;
2020
using MongoDB.Driver.Core.Clusters;
21+
using MongoDB.Driver.Core.Servers;
2122

2223
namespace MongoDB.Driver.Core.Bindings
2324
{
@@ -91,6 +92,15 @@ public interface ICoreSession : IDisposable
9192
/// </value>
9293
CoreSessionOptions Options { get; }
9394

95+
/// <summary>
96+
/// Gets or sets pinned server for the current transaction.
97+
/// Value has meaning if and only if a transaction is in progress.
98+
/// </summary>
99+
/// <value>
100+
/// The pinned server for the current transaction.
101+
/// </value>
102+
IServer PinnedServer { get; set; }
103+
94104
/// <summary>
95105
/// Gets the server session.
96106
/// </summary>

src/MongoDB.Driver.Core/Core/Bindings/NoCoreSession.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using System.Threading.Tasks;
1919
using MongoDB.Bson;
2020
using MongoDB.Driver.Core.Clusters;
21+
using MongoDB.Driver.Core.Servers;
2122

2223
namespace MongoDB.Driver.Core.Bindings
2324
{
@@ -76,6 +77,13 @@ public static ICoreSessionHandle NewHandle()
7677
/// <inheritdoc />
7778
public CoreSessionOptions Options => null;
7879

80+
/// <inheritdoc />
81+
public IServer PinnedServer
82+
{
83+
get => throw new NotSupportedException($"NoCoreSession does not support {nameof(PinnedServer)}.");
84+
set => throw new NotSupportedException($"NoCoreSession does not support {nameof(PinnedServer)}.");
85+
}
86+
7987
/// <inheritdoc />
8088
public ICoreServerSession ServerSession => NoCoreServerSession.Instance;
8189

src/MongoDB.Driver.Core/Core/Bindings/ReadPreferenceBinding.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ public ICoreSessionHandle Session
6868
public IChannelSourceHandle GetReadChannelSource(CancellationToken cancellationToken)
6969
{
7070
ThrowIfDisposed();
71-
var server = _cluster.SelectServer(_serverSelector, cancellationToken);
71+
var server = _cluster.SelectServerAndPinIfNeeded(_session, _serverSelector, cancellationToken);
7272
return GetChannelSourceHelper(server);
7373
}
7474

7575
/// <inheritdoc/>
7676
public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken cancellationToken)
7777
{
7878
ThrowIfDisposed();
79-
var server = await _cluster.SelectServerAsync(_serverSelector, cancellationToken).ConfigureAwait(false);
79+
var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, _serverSelector, cancellationToken).ConfigureAwait(false);
8080
return GetChannelSourceHelper(server);
8181
}
8282

src/MongoDB.Driver.Core/Core/Bindings/WrappingCoreSession.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using MongoDB.Bson;
2020
using MongoDB.Driver.Core.Clusters;
2121
using MongoDB.Driver.Core.Misc;
22+
using MongoDB.Driver.Core.Servers;
2223

2324
namespace MongoDB.Driver.Core.Bindings
2425
{
@@ -126,6 +127,13 @@ public virtual CoreSessionOptions Options
126127
}
127128
}
128129

130+
/// <inheritdoc />
131+
public IServer PinnedServer
132+
{
133+
get => _wrapped.PinnedServer;
134+
set => _wrapped.PinnedServer = value;
135+
}
136+
129137
/// <inheritdoc />
130138
public virtual ICoreServerSession ServerSession
131139
{

src/MongoDB.Driver.Core/Core/Bindings/WritableServerBinding.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,31 +63,32 @@ public ICoreSessionHandle Session
6363
public IChannelSourceHandle GetReadChannelSource(CancellationToken cancellationToken)
6464
{
6565
ThrowIfDisposed();
66-
var server = _cluster.SelectServer(WritableServerSelector.Instance, cancellationToken);
66+
var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, cancellationToken);
67+
6768
return GetChannelSourceHelper(server);
6869
}
6970

7071
/// <inheritdoc/>
7172
public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken cancellationToken)
7273
{
7374
ThrowIfDisposed();
74-
var server = await _cluster.SelectServerAsync(WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false);
75+
var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false);
7576
return GetChannelSourceHelper(server);
7677
}
7778

7879
/// <inheritdoc/>
7980
public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken)
8081
{
8182
ThrowIfDisposed();
82-
var server = _cluster.SelectServer(WritableServerSelector.Instance, cancellationToken);
83+
var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, cancellationToken);
8384
return GetChannelSourceHelper(server);
8485
}
8586

8687
/// <inheritdoc/>
8788
public async Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
8889
{
8990
ThrowIfDisposed();
90-
var server = await _cluster.SelectServerAsync(WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false);
91+
var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false);
9192
return GetChannelSourceHelper(server);
9293
}
9394

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/* Copyright 2019–present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Threading;
17+
using System.Threading.Tasks;
18+
using MongoDB.Driver.Core.Bindings;
19+
using MongoDB.Driver.Core.Clusters.ServerSelectors;
20+
using MongoDB.Driver.Core.Servers;
21+
22+
namespace MongoDB.Driver.Core.Clusters
23+
{
24+
/// <summary>
25+
/// This class contains extension methods for ICluster used for server selection with sharded transactions.
26+
/// </summary>
27+
internal static class IClusterExtensions
28+
{
29+
public static IServer SelectServerAndPinIfNeeded(
30+
this ICluster cluster,
31+
ICoreSessionHandle session,
32+
IServerSelector selector,
33+
CancellationToken cancellationToken)
34+
{
35+
var pinnedServer = GetPinnedServerIfValid(cluster, session);
36+
if (pinnedServer != null)
37+
{
38+
return pinnedServer;
39+
}
40+
41+
// Server selection also updates the cluster type, allowing us to to determine if the server
42+
// should be pinned.
43+
var server = cluster.SelectServer(selector, cancellationToken);
44+
PinServerIfNeeded(cluster, session, server);
45+
return server;
46+
}
47+
48+
public static async Task<IServer> SelectServerAndPinIfNeededAsync(
49+
this ICluster cluster,
50+
ICoreSessionHandle session,
51+
IServerSelector selector,
52+
CancellationToken cancellationToken)
53+
{
54+
var pinnedServer = GetPinnedServerIfValid(cluster, session);
55+
if (pinnedServer != null)
56+
{
57+
return pinnedServer;
58+
}
59+
60+
// Server selection also updates the cluster type, allowing us to to determine if the server
61+
// should be pinned.
62+
var server = await cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false);
63+
PinServerIfNeeded(cluster, session, server);
64+
65+
return server;
66+
}
67+
68+
private static void PinServerIfNeeded(ICluster cluster, ICoreSessionHandle session, IServer server)
69+
{
70+
if (cluster.Description.Type == ClusterType.Sharded && session.IsInTransaction)
71+
{
72+
session.PinnedServer = server;
73+
}
74+
}
75+
76+
private static IServer GetPinnedServerIfValid(ICluster cluster, ICoreSessionHandle session)
77+
{
78+
if (cluster.Description.Type == ClusterType.Sharded
79+
&& session.IsInTransaction
80+
&& session.PinnedServer != null
81+
&& session.CurrentTransaction.State != CoreTransactionState.Starting)
82+
{
83+
return session.PinnedServer;
84+
}
85+
else
86+
{
87+
return null;
88+
}
89+
}
90+
}
91+
}

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class Feature
6767
private static readonly Feature __scramSha1Authentication = new Feature("ScramSha1Authentication", new SemanticVersion(3, 0, 0));
6868
private static readonly Feature __scramSha256Authentication = new Feature("ScramSha256Authentication", new SemanticVersion(4, 0, 0, ""));
6969
private static readonly Feature __serverExtractsUsernameFromX509Certificate = new Feature("ServerExtractsUsernameFromX509Certificate", new SemanticVersion(3, 3, 12));
70+
private static readonly Feature __shardedTransactions = new Feature("ShardedTransactions", new SemanticVersion(4, 1, 0));
7071
private static readonly Feature __transactions = new Feature("Transactions", new SemanticVersion(3, 7, 0));
7172
private static readonly Feature __userManagementCommands = new Feature("UserManagementCommands", new SemanticVersion(2, 6, 0));
7273
private static readonly Feature __views = new Feature("Views", new SemanticVersion(3, 3, 11));
@@ -292,6 +293,11 @@ public class Feature
292293
/// </summary>
293294
public static Feature ServerExtractsUsernameFromX509Certificate => __serverExtractsUsernameFromX509Certificate;
294295

296+
/// <summary>
297+
/// Gets the sharded transactions feature.
298+
/// </summary>
299+
public static Feature ShardedTransactions => __shardedTransactions;
300+
295301
/// <summary>
296302
/// Gets the transactions feature.
297303
/// </summary>

tests/MongoDB.Driver.Core.TestHelpers/CoreTestConfiguration.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public static class CoreTestConfiguration
4343
// static fields
4444
private static Lazy<ICluster> __cluster = new Lazy<ICluster>(CreateCluster, isThreadSafe: true);
4545
private static Lazy<ConnectionString> __connectionString = new Lazy<ConnectionString>(GetConnectionString, isThreadSafe: true);
46+
private static Lazy<ConnectionString> __connectionStringWithMultipleShardRouters = new Lazy<ConnectionString>(
47+
GetConnectionStringWithMultipleShardRouters, isThreadSafe: true);
4648
private static Lazy<DatabaseNamespace> __databaseNamespace = new Lazy<DatabaseNamespace>(GetDatabaseNamespace, isThreadSafe: true);
4749
private static MessageEncoderSettings __messageEncoderSettings = new MessageEncoderSettings();
4850
private static TraceSource __traceSource;
@@ -57,6 +59,11 @@ public static ConnectionString ConnectionString
5759
{
5860
get { return __connectionString.Value; }
5961
}
62+
63+
public static ConnectionString ConnectionStringWithMultipleShardRouters
64+
{
65+
get => __connectionStringWithMultipleShardRouters.Value;
66+
}
6067

6168
public static DatabaseNamespace DatabaseNamespace
6269
{
@@ -227,6 +234,12 @@ private static ConnectionString GetConnectionString()
227234

228235
return new ConnectionString(uri);
229236
}
237+
238+
private static ConnectionString GetConnectionStringWithMultipleShardRouters()
239+
{
240+
var uri = Environment.GetEnvironmentVariable("MONGODB_URI_WITH_MULTIPLE_MONGOSES") ?? "mongodb://localhost,localhost:27018");
241+
return new ConnectionString(uri);
242+
}
230243

231244
private static DatabaseNamespace GetDatabaseNamespace()
232245
{

tests/MongoDB.Driver.Core.Tests/Core/Bindings/ReadPreferenceBindingTests.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using System.Net;
1718
using System.Reflection;
1819
using System.Threading;
1920
using System.Threading.Tasks;
@@ -103,6 +104,16 @@ public void GetReadChannelSource_should_use_a_read_preference_server_selector_to
103104
{
104105
var subject = new ReadPreferenceBinding(_mockCluster.Object, ReadPreference.Primary, NoCoreSession.NewHandle());
105106
var selectedServer = new Mock<IServer>().Object;
107+
108+
var clusterId = new ClusterId();
109+
var endPoint = new DnsEndPoint("localhost", 27017);
110+
var initialClusterDescription = new ClusterDescription(
111+
clusterId,
112+
ClusterConnectionMode.Automatic,
113+
ClusterType.Unknown,
114+
new[] { new ServerDescription(new ServerId(clusterId, endPoint), endPoint) });
115+
var finalClusterDescription = initialClusterDescription.WithType(ClusterType.Standalone);
116+
_mockCluster.SetupSequence(c => c.Description).Returns(initialClusterDescription).Returns(finalClusterDescription);
106117

107118
if (async)
108119
{
@@ -136,6 +147,16 @@ public void GetReadChannelSource_should_fork_the_session(
136147
_mockCluster.Setup(m => m.SelectServerAsync(It.IsAny<IServerSelector>(), cancellationToken)).Returns(Task.FromResult(selectedServer));
137148
var forkedSession = new Mock<ICoreSessionHandle>().Object;
138149
mockSession.Setup(m => m.Fork()).Returns(forkedSession);
150+
151+
var clusterId = new ClusterId();
152+
var endPoint = new DnsEndPoint("localhost", 27017);
153+
var initialClusterDescription = new ClusterDescription(
154+
clusterId,
155+
ClusterConnectionMode.Automatic,
156+
ClusterType.Unknown,
157+
new[] { new ServerDescription(new ServerId(clusterId, endPoint), endPoint) });
158+
var finalClusterDescription = initialClusterDescription.WithType(ClusterType.Standalone);
159+
_mockCluster.SetupSequence(c => c.Description).Returns(initialClusterDescription).Returns(finalClusterDescription);
139160

140161
IChannelSourceHandle result;
141162
if (async)

0 commit comments

Comments
 (0)