Skip to content
Open
131 changes: 131 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.AzureStorage.Tests
{
using System.Collections.Generic;
using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class TestPartitionIndex
{
private AzureStorageOrchestrationService azureStorageOrchestrationService;
private AzureStorageOrchestrationServiceSettings settings;
private int partitionCount = 4;
private CancellationTokenSource cancellationTokenSource;
private const string TaskHub = "taskHubName";

private Dictionary<uint, string> partitionToInstanceId = new Dictionary<uint, string>()
{
{ 0, "sampleinstanceid!13"},
{ 1, "sampleinstanceid!3"},
{ 2, "sampleinstanceid!1!"},
{ 3, "sampleinstanceid!1"}
};

private Dictionary<uint, string> partitionToInstanceIdWithExplicitPartitionPlacement = new Dictionary<uint, string>()
{
{ 0, "sampleinstanceid!0"},
{ 1, "sampleinstanceid!2!1"},
{ 2, "sampleinstanceid!2"},
{ 3, "sampleinstanceid!3"}
};

private Dictionary<uint, string> partitionToInstanceIdWithExplicitPartitionPlacementEndingWithExclamation = new Dictionary<uint, string>()
{
{ 0, "sampleinstanceid!3!"},
{ 1, "sampleinstanceid!2!"},
{ 2, "sampleinstanceid!1!"},
{ 3, "sampleinstanceid!0!"}
};

[TestInitialize]
public void Initialize()
{
cancellationTokenSource = new CancellationTokenSource();

settings = new AzureStorageOrchestrationServiceSettings()
{
StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(),
TaskHubName = TaskHub,
PartitionCount = partitionCount
};

azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings);

string str = "sampleinstanceid!0!";
var index = azureStorageOrchestrationService.GetPartitionIndex(str);
}

[TestMethod]
public void GetPartitionIndexTest_EnableExplicitPartitionPlacement_False()
{
settings.EnableExplicitPartitionPlacement = false;

foreach (var kvp in partitionToInstanceId)
{
var instanceId = kvp.Value;
var expectedPartitionIndex = kvp.Key;
var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId);

Assert.AreEqual(expectedPartitionIndex, partitionIndex);
}
}

[TestMethod]
public void GetPartitionIndexTest_EnableExplicitPartitionPlacement_True()
{
settings.EnableExplicitPartitionPlacement = true;

foreach (var kvp in partitionToInstanceIdWithExplicitPartitionPlacement)
{
var instanceId = kvp.Value;
var expectedPartitionIndex = kvp.Key;
var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId);

Assert.AreEqual(expectedPartitionIndex, partitionIndex);
}
}

[TestMethod]
public void GetPartitionIndexTest_EndingWithExclamation_EnableExplicitPartitionPlacement_True()
{
settings.EnableExplicitPartitionPlacement = true;

foreach (var kvp in partitionToInstanceIdWithExplicitPartitionPlacementEndingWithExclamation)
{
var instanceId = kvp.Value;
var expectedPartitionIndex = kvp.Key;
var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId);

Assert.AreEqual(expectedPartitionIndex, partitionIndex);
}
}

[TestMethod]
public void GetPartitionIndexTest_EndingWithExclamation_EnableExplicitPartitionPlacement_False()
{
settings.EnableExplicitPartitionPlacement = false;

foreach (var kvp in partitionToInstanceIdWithExplicitPartitionPlacementEndingWithExclamation)
{
var instanceId = kvp.Value;
var expectedPartitionIndex = kvp.Key;
var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId);

Assert.AreEqual(expectedPartitionIndex, partitionIndex);
}
}
}
}
28 changes: 26 additions & 2 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2048,9 +2048,9 @@ public Task<string> DownloadBlobAsync(string blobUri)

// TODO: Change this to a sticky assignment so that partition count changes can
// be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1
async Task<ControlQueue?> GetControlQueueAsync(string instanceId)
internal async Task<ControlQueue?> GetControlQueueAsync(string instanceId)
{
uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount;
uint partitionIndex = GetPartitionIndex(instanceId);
string queueName = GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex);

ControlQueue cachedQueue;
Expand All @@ -2075,6 +2075,30 @@ public Task<string> DownloadBlobAsync(string blobUri)
return cachedQueue;
}

internal uint GetPartitionIndex(string instanceId)
{
uint totalPartitions = (uint)this.settings.PartitionCount;

int placementSeparatorPosition = instanceId.LastIndexOf('!');

// if the instance id ends with !nnn, where nnn is an unsigned number, it indicates explicit partition placement
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test that documents the behavior if the customer uses an instanceID with multiple ! in there? Say instanceID "A!1!B!3` should probably map to partition "3", right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add a test that checks that instanceID myinstanceID!NotANumber does not trigger any errors / that it correctly ignores the explicit placement logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding the first test, I think we're just missing the very last one:

Let's also add a test that checks that instanceID myinstanceID!NotANumber does not trigger any errors / that it correctly ignores the explicit placement logic.

// if the instance id has multiple ! then only the last one will be considered. i.e., abd!2!3 then !3 will be considered.
// and if the instance id ends with !, then it will considered in non '!nnn' format and hashing will be used for control-queue placement.
if (
this.settings.EnableExplicitPartitionPlacement
&& placementSeparatorPosition != -1
&& uint.TryParse(instanceId.Substring(placementSeparatorPosition + 1), out uint index))
{
var partitionId = index % totalPartitions;
return (uint)partitionId;
}
else
{
return Fnv1aHashHelper.ComputeHash(instanceId) % totalPartitions;

}
}

/// <summary>
/// Disposes of the current object.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,14 @@ internal LogHelper Logger
/// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true.
/// </summary>
public bool UseSeparateQueueForEntityWorkItems { get; set; } = false;

/// <summary>
/// Whether to allow instanceIDs to use special syntax to land on a specific partition.
/// If enabled, when an instanceID ends with suffix '!nnn', where 'nnn' is an unsigned number, the instance will land on the partition/queue for to that number.
/// </summary>
/// <remarks>
/// It is not generally safe to change to this flag for pre-existing TaskHubs, as it may change the expected target queue for an instanceID.
/// </remarks>
public bool EnableExplicitPartitionPlacement { get; set; } = false;
}
}
Loading