Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/MongoDB.Driver.Core/Core/Bindings/CoreSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,6 @@ private void EnsureTransactionsAreSupported()

var connectedDataBearingServers = _cluster.Description.Servers.Where(s => s.State == ServerState.Connected && s.IsDataBearing).ToList();

if (connectedDataBearingServers.Count == 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we don't necessary have selected server, switch to "best effort" validation. Compatible with spec.

{
throw new NotSupportedException("StartTransaction cannot determine if transactions are supported because there are no connected servers.");
}

foreach (var connectedDataBearingServer in connectedDataBearingServers)
{
var serverType = connectedDataBearingServer.Type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,20 @@ private Type0CommandMessageSection<BsonDocument> CreateType0Section(ConnectionDe

if (_session.Id != null)
{
if (IsSessionAcknowledged())
var areSessionsSupported = connectionDescription.HelloResult.LogicalSessionTimeout.HasValue;

if (!areSessionsSupported)
{
if (_session.IsImplicit)
{
// do not set sessionId if session is implicit and sessions are not supported
}
else
{
throw new MongoClientException("Sessions are not supported.");
}
}
else if (IsSessionAcknowledged())
{
AddIfNotAlreadyAdded("lsid", _session.Id);
}
Expand All @@ -351,7 +364,6 @@ private Type0CommandMessageSection<BsonDocument> CreateType0Section(ConnectionDe
}
}


var snapshotReadConcernDocument = ReadConcernHelper.GetReadConcernForSnapshotSesssion(_session, connectionDescription);
if (snapshotReadConcernDocument != null)
{
Expand Down
11 changes: 10 additions & 1 deletion src/MongoDB.Driver/Encryption/MongocryptdFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using System.Linq;
using System.Reflection;
using MongoDB.Driver.Core;
using MongoDB.Driver.Core.Events;
using MongoDB.Driver.Core.Misc;

namespace MongoDB.Driver.Encryption
Expand Down Expand Up @@ -74,18 +75,26 @@ public static string ExtractCryptSharedLibPath(IReadOnlyDictionary<string, objec
internal class MongocryptdFactory
{
private readonly bool? _bypassQueryAnalysis;
private readonly IEventSubscriber _eventSubscriber;
private readonly IReadOnlyDictionary<string, object> _extraOptions;

public MongocryptdFactory(IReadOnlyDictionary<string, object> extraOptions, bool? bypassQueryAnalysis)
public MongocryptdFactory(IReadOnlyDictionary<string, object> extraOptions, bool? bypassQueryAnalysis, IEventSubscriber eventSubscriber = null)
Copy link
Contributor Author

@BorisDog BorisDog May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is for testing, were access to the events is needed.

{
_bypassQueryAnalysis = bypassQueryAnalysis;
_eventSubscriber = eventSubscriber;
_extraOptions = extraOptions ?? new Dictionary<string, object>();
}

// public methods
public IMongoClient CreateMongocryptdClient()
{
var clientSettings = CreateMongocryptdClientSettings();

if (_eventSubscriber != null)
{
clientSettings.ClusterConfigurator = c => c.Subscribe(_eventSubscriber);
}

return new MongoClient(clientSettings);
}

Expand Down
108 changes: 9 additions & 99 deletions src/MongoDB.Driver/MongoClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -24,10 +23,8 @@
using MongoDB.Bson.Serialization.Serializers;
using MongoDB.Driver.Core.Bindings;
using MongoDB.Driver.Core.Clusters;
using MongoDB.Driver.Core.Clusters.ServerSelectors;
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Operations;
using MongoDB.Driver.Core.Servers;
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
using MongoDB.Driver.Encryption;
using MongoDB.Driver.Linq;
Expand All @@ -37,23 +34,6 @@ namespace MongoDB.Driver
/// <inheritdoc/>
public class MongoClient : MongoClientBase
{
#region static
// private static methods
private static IEnumerable<ServerDescription> SelectServersThatDetermineWhetherSessionsAreSupported(ClusterDescription cluster, IEnumerable<ServerDescription> servers)
{
var connectedServers = servers.Where(s => s.State == ServerState.Connected);

if (cluster.IsDirectConnection)
{
return connectedServers;
}
else
{
return connectedServers.Where(s => s.IsDataBearing);
}
}
#endregion

// private fields
private readonly ICluster _cluster;
private readonly AutoEncryptionLibMongoCryptController _libMongoCryptController;
Expand Down Expand Up @@ -146,9 +126,6 @@ internal void ConfigureAutoEncryptionMessageEncoderSettings(MessageEncoderSettin
}
}

// private static methods


// public methods
/// <inheritdoc/>
public sealed override void DropDatabase(string name, CancellationToken cancellationToken = default(CancellationToken))
Expand Down Expand Up @@ -346,32 +323,28 @@ public sealed override Task<IAsyncCursor<BsonDocument>> ListDatabasesAsync(
/// <returns>A session.</returns>
internal IClientSessionHandle StartImplicitSession(CancellationToken cancellationToken)
{
var areSessionsSupported = AreSessionsSupported(cancellationToken);
return StartImplicitSession(areSessionsSupported);
return StartImplicitSession();
}

/// <summary>
/// Starts an implicit session.
/// </summary>
/// <returns>A Task whose result is a session.</returns>
internal async Task<IClientSessionHandle> StartImplicitSessionAsync(CancellationToken cancellationToken)
internal Task<IClientSessionHandle> StartImplicitSessionAsync(CancellationToken cancellationToken)
{
var areSessionsSupported = await AreSessionsSupportedAsync(cancellationToken).ConfigureAwait(false);
return StartImplicitSession(areSessionsSupported);
return Task.FromResult(StartImplicitSession());
}

/// <inheritdoc/>
public sealed override IClientSessionHandle StartSession(ClientSessionOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
{
var areSessionsSupported = AreSessionsSupported(cancellationToken);
return StartSession(options, areSessionsSupported);
return StartSession(options);
}

/// <inheritdoc/>
public sealed override async Task<IClientSessionHandle> StartSessionAsync(ClientSessionOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
public sealed override Task<IClientSessionHandle> StartSessionAsync(ClientSessionOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
{
var areSessionsSupported = await AreSessionsSupportedAsync(cancellationToken).ConfigureAwait(false);
return StartSession(options, areSessionsSupported);
return Task.FromResult(StartSession(options));
}

/// <inheritdoc/>
Expand Down Expand Up @@ -446,52 +419,6 @@ public override IMongoClient WithWriteConcern(WriteConcern writeConcern)
}

// private methods
private bool AreSessionsSupported(CancellationToken cancellationToken)
{
return AreSessionsSupported(_cluster.Description) ?? AreSessionsSupportedAfterServerSelection(cancellationToken);
}

private async Task<bool> AreSessionsSupportedAsync(CancellationToken cancellationToken)
{
return AreSessionsSupported(_cluster.Description) ?? await AreSessionsSupportedAfterServerSelectionAsync(cancellationToken).ConfigureAwait(false);
}

private bool? AreSessionsSupported(ClusterDescription clusterDescription)
{
if (clusterDescription.LogicalSessionTimeout.HasValue || clusterDescription.Type == ClusterType.LoadBalanced)
{
return true;
}
else
{
var selectedServers = SelectServersThatDetermineWhetherSessionsAreSupported(clusterDescription, clusterDescription.Servers).ToList();
if (selectedServers.Count == 0)
{
return null;
}
else
{
return false;
}
}
}

private bool AreSessionsSupportedAfterServerSelection(CancellationToken cancellationToken)
{
var selector = new AreSessionsSupportedServerSelector();
var selectedServer = _cluster.SelectServer(selector, cancellationToken);
var clusterDescription = selector.ClusterDescription ?? _cluster.Description; // LB cluster doesn't use server selector, so clusterDescription is null for this case
return AreSessionsSupported(clusterDescription) ?? false;
}

private async Task<bool> AreSessionsSupportedAfterServerSelectionAsync(CancellationToken cancellationToken)
{
var selector = new AreSessionsSupportedServerSelector();
var selectedServer = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false);
var clusterDescription = selector.ClusterDescription ?? _cluster.Description; // LB cluster doesn't use server selector, so clusterDescription is null for this case
return AreSessionsSupported(clusterDescription) ?? false;
}

private IAsyncCursor<string> CreateDatabaseNamesCursor(IAsyncCursor<BsonDocument> cursor)
{
return new BatchTransformingAsyncCursor<BsonDocument, string>(
Expand Down Expand Up @@ -608,15 +535,15 @@ private MessageEncoderSettings GetMessageEncoderSettings()
return messageEncoderSettings;
}

private IClientSessionHandle StartImplicitSession(bool areSessionsSupported)
private IClientSessionHandle StartImplicitSession()
{
var options = new ClientSessionOptions { CausalConsistency = false, Snapshot = false };

ICoreSessionHandle coreSession;
#pragma warning disable 618
var areMultipleUsersAuthenticated = _settings.Credentials.Count() > 1;
#pragma warning restore
if (areSessionsSupported && !areMultipleUsersAuthenticated)
if (!areMultipleUsersAuthenticated)
{
coreSession = _cluster.StartSession(options.ToCore(isImplicit: true));
}
Expand All @@ -628,13 +555,8 @@ private IClientSessionHandle StartImplicitSession(bool areSessionsSupported)
return new ClientSessionHandle(this, options, coreSession);
}

private IClientSessionHandle StartSession(ClientSessionOptions options, bool areSessionsSupported)
private IClientSessionHandle StartSession(ClientSessionOptions options)
{
if (!areSessionsSupported)
{
throw new NotSupportedException("Sessions are not supported by this version of the server.");
}

if (options != null && options.Snapshot && options.CausalConsistency == true)
{
throw new NotSupportedException("Combining both causal consistency and snapshot options is not supported.");
Expand Down Expand Up @@ -677,17 +599,5 @@ private async Task<TResult> UsingImplicitSessionAsync<TResult>(Func<IClientSessi
return await funcAsync(session).ConfigureAwait(false);
}
}

// nested types
private class AreSessionsSupportedServerSelector : IServerSelector
{
public ClusterDescription ClusterDescription;

public IEnumerable<ServerDescription> SelectServers(ClusterDescription cluster, IEnumerable<ServerDescription> servers)
{
ClusterDescription = cluster;
return SelectServersThatDetermineWhetherSessionsAreSupported(cluster, servers);
}
}
}
}
26 changes: 4 additions & 22 deletions tests/MongoDB.Driver.Core.Tests/Core/Bindings/CoreSessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
*/

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using FluentAssertions;
using MongoDB.Bson;
using MongoDB.Bson.TestHelpers;
using MongoDB.TestHelpers.XunitExtensions;
using MongoDB.Driver.Core.Clusters;
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Servers;
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
using MongoDB.TestHelpers.XunitExtensions;
using Moq;
using Xunit;

Expand Down Expand Up @@ -250,20 +249,6 @@ public void WasUsed_should_call_serverSession()
Mock.Get(subject.ServerSession).Verify(m => m.WasUsed(), Times.Once);
}

[Theory]
[ParameterAttributeData]
public void EnsureTransactionsAreSupported_should_throw_when_there_are_no_connected_servers(
[Values(0, 1, 2, 3)] int numberOfDisconnectedServers)
{
var clusterDescription = CreateClusterDescriptionWithDisconnectedServers(numberOfDisconnectedServers);
var subject = CreateSubject(clusterDescription);

var exception = Record.Exception(() => subject.EnsureTransactionsAreSupported());

var e = exception.Should().BeOfType<NotSupportedException>().Subject;
e.Message.Should().Be("StartTransaction cannot determine if transactions are supported because there are no connected servers.");
}

[Theory]
[ParameterAttributeData]
public void EnsureTransactionsAreSupported_should_not_throw_when_there_are_no_connected_servers_with_LB(
Expand All @@ -273,7 +258,7 @@ public void EnsureTransactionsAreSupported_should_not_throw_when_there_are_no_co
clusterDescription = clusterDescription.WithType(ClusterType.LoadBalanced);
var subject = CreateSubject(clusterDescription);

subject.EnsureTransactionsAreSupported();
subject.EnsureTransactionsAreSupported();
}

// EnsureTransactionsAreSupported scenario codes
Expand Down Expand Up @@ -325,7 +310,7 @@ public void EnsureTransactionsAreSupported_should_ignore_disconnected_servers(st
[InlineData("CA,DU")]
[InlineData("CA,CA")]
[InlineData("CA,DL")]
public void EnsureTransactionsAreSupported_should_throw_when_there_are_no_connected_data_bearing_servers(string scenarios)
public void EnsureTransactionsAreSupported_should_ignore_no_data_bearing_servers(string scenarios)
{
var clusterId = new ClusterId(1);
var servers =
Expand All @@ -342,10 +327,7 @@ public void EnsureTransactionsAreSupported_should_throw_when_there_are_no_connec
var cluster = CreateClusterDescription(clusterId, servers: servers);
var subject = CreateSubject(cluster);

var exception = Record.Exception(() => subject.EnsureTransactionsAreSupported());

var e = exception.Should().BeOfType<NotSupportedException>().Subject;
e.Message.Should().Be("StartTransaction cannot determine if transactions are supported because there are no connected servers.");
subject.EnsureTransactionsAreSupported();
}

[Theory]
Expand Down
Loading