Skip to content

Commit 5cf07ee

Browse files
committed
ChangeFeedProcessor
1 parent 26d1a4f commit 5cf07ee

26 files changed

+2445
-0
lines changed

samples/ChangeFeedProcessor/ChangeFeedEventHost.cs

Lines changed: 538 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
namespace DocumentDB.ChangeFeedProcessor
2+
{
3+
using System;
4+
5+
/// <summary>
6+
/// Options to control various aspects of partition distribution happening within <see cref="ChangeFeedEventHost"/> instance.
7+
/// </summary>
8+
public class ChangeFeedHostOptions
9+
{
10+
static readonly TimeSpan DefaultRenewInterval = TimeSpan.FromSeconds(17);
11+
static readonly TimeSpan DefaultAcquireInterval = TimeSpan.FromSeconds(13);
12+
static readonly TimeSpan DefaultExpirationInterval = TimeSpan.FromSeconds(60);
13+
static readonly TimeSpan DefaultFeedPollDelay = TimeSpan.FromSeconds(5);
14+
15+
/// <summary>Initializes a new instance of the <see cref="DocumentDB.ChangeFeedProcessor.ChangeFeedHostOptions" /> class.</summary>
16+
public ChangeFeedHostOptions()
17+
{
18+
this.LeaseRenewInterval = DefaultRenewInterval;
19+
this.LeaseAcquireInterval = DefaultAcquireInterval;
20+
this.LeaseExpirationInterval = DefaultExpirationInterval;
21+
this.FeedPollDelay = DefaultFeedPollDelay;
22+
}
23+
24+
/// <summary>
25+
/// Gets or sets renew interval for all leases for partitions currently held by <see cref="ChangeFeedEventHost"/> instance.
26+
/// </summary>
27+
public TimeSpan LeaseRenewInterval { get; set; }
28+
29+
/// <summary>
30+
/// Gets or sets the interval to kick off a task to compute if partitions are distributed evenly among known host instances.
31+
/// </summary>
32+
public TimeSpan LeaseAcquireInterval { get; set; }
33+
34+
/// <summary>
35+
/// Gets or sets the interval for which the lease is taken on a lease representing a partition. If the lease is not renewed within this
36+
/// interval, it will cause it to expire and ownership of the partition will move to another <see cref="ChangeFeedEventHost"/> instance.
37+
/// </summary>
38+
public TimeSpan LeaseExpirationInterval { get; set; }
39+
40+
/// <summary>
41+
/// Gets or sets the delay in between polling a partition for new changes on the feed, after all current changes are drained.
42+
/// </summary>
43+
public TimeSpan FeedPollDelay { get; set; }
44+
45+
/// <summary>
46+
/// Gets or set the minimum partition count for the host.
47+
/// This can be used to increase the number of partitions for the host and thus override equal distribution (which is the default) of leases between hosts.
48+
/// </summary>
49+
internal int MinPartitionCount { get; set; }
50+
51+
/// <summary>
52+
/// Gets or sets the maximum number of partitions the host can serve.
53+
/// This can be used property to limit the number of partitions for the host and thus override equal distribution (which is the default) of leases between hosts.
54+
/// Default is 0 (unlimited).
55+
/// </summary>
56+
internal int MaxPartitionCount { get; set; }
57+
58+
/// <summary>
59+
/// Gets or sets whether on start of the host all existing leases should be deleted and the host should start from scratch.
60+
/// </summary>
61+
internal bool DiscardExistingLeases { get; set; }
62+
}
63+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
namespace DocumentDB.ChangeFeedProcessor
2+
{
3+
/// <summary>
4+
/// The reason why an instance of Observer is closed.
5+
/// </summary>
6+
public enum ChangeFeedObserverCloseReason
7+
{
8+
/// <summary>
9+
/// Unknown failure. This should never be sent to observers.
10+
/// </summary>
11+
Unknown = 0,
12+
13+
/// <summary>
14+
/// The ChangeFeedEventHost is shutting down.
15+
/// </summary>
16+
Shutdown,
17+
18+
/// <summary>
19+
/// The resource, such as database or collection was removed.
20+
/// </summary>
21+
ResourceGone,
22+
23+
/// <summary>
24+
/// Lease was lost due to expiration or load-balancing.
25+
/// </summary>
26+
LeaseLost,
27+
28+
/// <summary>
29+
/// IChangeFeedObserver threw an exception.
30+
/// </summary>
31+
ObserverError,
32+
}
33+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace DocumentDB.ChangeFeedProcessor
2+
{
3+
/// <summary>
4+
/// The context passed to <see cref="DocumentDB.ChangeFeedProcessor.IChangeFeedObserver"/> events.
5+
/// </summary>
6+
public class ChangeFeedObserverContext
7+
{
8+
/// <summary>
9+
/// Gets the id of the partition for current event.
10+
/// </summary>
11+
public string PartitionKeyRangeId { get; internal set; }
12+
}
13+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
namespace DocumentDB.ChangeFeedProcessor
2+
{
3+
using Microsoft.Azure.Documents.Client;
4+
using System;
5+
6+
/// <summary>
7+
/// Holds information specifying how to get Document collection.
8+
/// </summary>
9+
public class DocumentCollectionInfo
10+
{
11+
/// <summary>
12+
/// Initializes a new instance of the <see cref="DocumentCollectionInfo"/> class.
13+
/// </summary>
14+
public DocumentCollectionInfo()
15+
{
16+
this.ConnectionPolicy = new ConnectionPolicy { ConnectionProtocol = Protocol.Tcp, ConnectionMode = ConnectionMode.Direct };
17+
}
18+
19+
/// <summary>
20+
/// Initializes a new instance of the <see cref="DocumentCollectionInfo"/> class.
21+
/// </summary>
22+
/// <param name="other">The other <see cref="DocumentCollectionInfo"/> to copy settings from.</param>
23+
public DocumentCollectionInfo(DocumentCollectionInfo other)
24+
{
25+
this.Uri = other.Uri;
26+
this.MasterKey = other.MasterKey;
27+
this.DatabaseName = other.DatabaseName;
28+
this.CollectionName = other.CollectionName;
29+
this.ConnectionPolicy = other.ConnectionPolicy;
30+
}
31+
32+
/// <summary>
33+
/// Gets or sets the Uri of the Document service.
34+
/// </summary>
35+
public Uri Uri { get; set; }
36+
37+
/// <summary>
38+
/// Gets or sets the secret master key to connect to the Document service.
39+
/// </summary>
40+
public string MasterKey { get; set; }
41+
42+
/// <summary>
43+
/// Gets or sets the name of the database the collection resides in.
44+
/// </summary>
45+
public string DatabaseName { get; set; }
46+
47+
/// <summary>
48+
/// Gets or sets the name of the Document collection.
49+
/// </summary>
50+
public string CollectionName { get; set; }
51+
52+
/// <summary>
53+
/// Gets or sets the connection policy to connect to Document service.
54+
/// </summary>
55+
public ConnectionPolicy ConnectionPolicy { get; set; }
56+
}
57+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3+
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
4+
<PropertyGroup>
5+
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
6+
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
7+
<ProjectGuid>{D14CE64B-5267-4539-B61F-36100AED3EE7}</ProjectGuid>
8+
<OutputType>Library</OutputType>
9+
<AppDesignerFolder>Properties</AppDesignerFolder>
10+
<RootNamespace>DocumentDB.ChangeFeedProcessor</RootNamespace>
11+
<AssemblyName>DocumentDB.ChangeFeedProcessor</AssemblyName>
12+
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
13+
<FileAlignment>512</FileAlignment>
14+
<NuGetPackageImportStamp>e3edd326</NuGetPackageImportStamp>
15+
<TargetFrameworkProfile />
16+
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
17+
<RestorePackages>true</RestorePackages>
18+
<SccProjectName>SAK</SccProjectName>
19+
<SccLocalPath>SAK</SccLocalPath>
20+
<SccAuxPath>SAK</SccAuxPath>
21+
<SccProvider>SAK</SccProvider>
22+
</PropertyGroup>
23+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
24+
<DebugSymbols>true</DebugSymbols>
25+
<DebugType>full</DebugType>
26+
<Optimize>false</Optimize>
27+
<OutputPath>bin\Debug\</OutputPath>
28+
<DefineConstants>DEBUG;TRACE</DefineConstants>
29+
<ErrorReport>prompt</ErrorReport>
30+
<WarningLevel>4</WarningLevel>
31+
</PropertyGroup>
32+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
33+
<DebugType>pdbonly</DebugType>
34+
<Optimize>true</Optimize>
35+
<OutputPath>bin\Release\</OutputPath>
36+
<DefineConstants>TRACE</DefineConstants>
37+
<ErrorReport>prompt</ErrorReport>
38+
<WarningLevel>4</WarningLevel>
39+
<RunCodeAnalysis>true</RunCodeAnalysis>
40+
</PropertyGroup>
41+
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|x64'">
42+
<DebugSymbols>true</DebugSymbols>
43+
<OutputPath>bin\x64\Debug\</OutputPath>
44+
<DefineConstants>DEBUG;TRACE</DefineConstants>
45+
<DebugType>full</DebugType>
46+
<PlatformTarget>x64</PlatformTarget>
47+
<ErrorReport>prompt</ErrorReport>
48+
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
49+
<DocumentationFile>bin\x64\Debug\DocumentDB.ChangeFeedProcessor.XML</DocumentationFile>
50+
</PropertyGroup>
51+
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|x64'">
52+
<OutputPath>bin\x64\Release\</OutputPath>
53+
<DefineConstants>TRACE</DefineConstants>
54+
<Optimize>true</Optimize>
55+
<DebugType>pdbonly</DebugType>
56+
<PlatformTarget>x64</PlatformTarget>
57+
<ErrorReport>prompt</ErrorReport>
58+
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
59+
<DocumentationFile>bin\x64\Release\DocumentDB.ChangeFeedProcessor.XML</DocumentationFile>
60+
</PropertyGroup>
61+
<ItemGroup>
62+
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.11.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
63+
<SpecificVersion>False</SpecificVersion>
64+
<HintPath>..\..\..\..\..\..\NuGetPackages\Microsoft.Azure.DocumentDB.1.11.0\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
65+
</Reference>
66+
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
67+
<SpecificVersion>False</SpecificVersion>
68+
<HintPath>..\..\..\..\..\NuGetPackages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
69+
</Reference>
70+
<Reference Include="System" />
71+
<Reference Include="System.Core" />
72+
<Reference Include="System.Data" />
73+
<Reference Include="System.Runtime.Serialization" />
74+
<Reference Include="System.ServiceModel" />
75+
<Reference Include="Microsoft.CSharp" />
76+
<Reference Include="System.Spatial, Version=5.6.4.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
77+
<HintPath>..\..\..\..\..\NuGetPackages\System.Spatial.5.6.4\lib\net40\System.Spatial.dll</HintPath>
78+
<Private>True</Private>
79+
</Reference>
80+
</ItemGroup>
81+
<ItemGroup>
82+
<Compile Include="ChangeFeedObserverCloseReason.cs" />
83+
<Compile Include="DocumentCollectionInfo.cs" />
84+
<Compile Include="ChangeFeedEventHost.cs" />
85+
<Compile Include="ChangeFeedObserverContext.cs" />
86+
<Compile Include="Internal\DocumentLeaseStore\DocumentServiceLease.cs" />
87+
<Compile Include="Internal\DocumentLeaseStore\DocumentServiceLeaseManager.cs" />
88+
<Compile Include="Internal\DocumentLeaseStore\LeaseState.cs" />
89+
<Compile Include="Internal\ICheckpointManager.cs" />
90+
<Compile Include="Internal\IDocumentFeedObserverFactory.cs" />
91+
<Compile Include="Internal\ILeaseManager.cs" />
92+
<Compile Include="Internal\IPartitionManager.cs" />
93+
<Compile Include="Internal\IPartitionObserver.cs" />
94+
<Compile Include="Internal\Lease.cs" />
95+
<Compile Include="Internal\LeaseLostException.cs" />
96+
<Compile Include="Internal\PartitionManager.cs" />
97+
<Compile Include="Internal\StatusCode.cs" />
98+
<Compile Include="ChangeFeedHostOptions.cs" />
99+
<Compile Include="Internal\TraceLog.cs" />
100+
<Compile Include="Properties\AssemblyInfo.cs" />
101+
<Compile Include="IChangeFeedObserver.cs" />
102+
<Compile Include="Internal\DocumentFeedObserverFactory.cs" />
103+
</ItemGroup>
104+
<ItemGroup>
105+
<None Include="app.config" />
106+
<None Include="packages.config" />
107+
<None Include="ReadMe.docx" />
108+
</ItemGroup>
109+
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
110+
<Import Project="..\..\..\..\..\..\NuGetPackages\Microsoft.Azure.DocumentDB.1.11.0\build\Microsoft.Azure.DocumentDB.targets" Condition="Exists('..\..\..\..\..\..\NuGetPackages\Microsoft.Azure.DocumentDB.1.11.0\build\Microsoft.Azure.DocumentDB.targets')" />
111+
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
112+
<PropertyGroup>
113+
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
114+
</PropertyGroup>
115+
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
116+
<Error Condition="!Exists('..\..\..\..\..\..\NuGetPackages\Microsoft.Azure.DocumentDB.1.11.0\build\Microsoft.Azure.DocumentDB.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\..\..\NuGetPackages\Microsoft.Azure.DocumentDB.1.11.0\build\Microsoft.Azure.DocumentDB.targets'))" />
117+
</Target>
118+
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
119+
<Import Project="..\..\..\..\..\..\NuGetPackages\Microsoft.Azure.DocumentDB.1.11.0\build\Microsoft.Azure.DocumentDB.targets" Condition="Exists('..\..\..\..\..\..\NuGetPackages\Microsoft.Azure.DocumentDB.1.11.0\build\Microsoft.Azure.DocumentDB.targets')" />
120+
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
121+
Other similar extension points exist, see Microsoft.Common.targets.
122+
<Target Name="BeforeBuild">
123+
</Target>
124+
<Target Name="AfterBuild">
125+
</Target>
126+
-->
127+
</Project>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio 2013
4+
VisualStudioVersion = 12.0.31101.0
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DocumentDB.ChangeFeedProcessor", "DocumentDB.ChangeFeedProcessor.csproj", "{D14CE64B-5267-4539-B61F-36100AED3EE7}"
7+
EndProject
8+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{C67BF63E-8757-4124-B822-12C54C0DDABA}"
9+
ProjectSection(SolutionItems) = preProject
10+
.nuget\NuGet.Config = .nuget\NuGet.Config
11+
.nuget\NuGet.exe = .nuget\NuGet.exe
12+
.nuget\NuGet.targets = .nuget\NuGet.targets
13+
EndProjectSection
14+
EndProject
15+
Global
16+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
17+
Debug|x64 = Debug|x64
18+
Release|x64 = Release|x64
19+
EndGlobalSection
20+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
21+
{D14CE64B-5267-4539-B61F-36100AED3EE7}.Debug|x64.ActiveCfg = Debug|x64
22+
{D14CE64B-5267-4539-B61F-36100AED3EE7}.Debug|x64.Build.0 = Debug|x64
23+
{D14CE64B-5267-4539-B61F-36100AED3EE7}.Release|x64.ActiveCfg = Release|x64
24+
{D14CE64B-5267-4539-B61F-36100AED3EE7}.Release|x64.Build.0 = Release|x64
25+
EndGlobalSection
26+
GlobalSection(SolutionProperties) = preSolution
27+
HideSolutionNode = FALSE
28+
EndGlobalSection
29+
EndGlobal
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
namespace DocumentDB.ChangeFeedProcessor
2+
{
3+
using Microsoft.Azure.Documents;
4+
using System.Collections.Generic;
5+
using System.Threading.Tasks;
6+
7+
/// <summary>
8+
/// This interface is used to deliver change events to document feed observers.
9+
/// </summary>
10+
public interface IChangeFeedObserver
11+
{
12+
/// <summary>
13+
/// This is called when change feed observer is opened.
14+
/// </summary>
15+
/// <param name="context">The context specifying partition for this observer, etc.</param>
16+
/// <returns>A Task to allow asynchronous execution.</returns>
17+
Task OpenAsync(ChangeFeedObserverContext context);
18+
19+
/// <summary>
20+
/// This is called when change feed observer is opened.
21+
/// </summary>
22+
/// <param name="context">The context specifying partition for this observer, etc.</param>
23+
/// <param name="reason">Specifies the reason the observer is closed.</param>
24+
/// <returns>A Task to allow asynchronous execution.</returns>
25+
Task CloseAsync(ChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason);
26+
27+
/// <summary>
28+
/// This is called when document changes are available on change feed.
29+
/// </summary>
30+
/// <param name="context">The context specifying partition for this change event, etc.</param>
31+
/// <param name="docs">The documents changed.</param>
32+
/// <returns>A Task to allow asynchronous execution.</returns>
33+
Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyList<Document> docs);
34+
}
35+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace DocumentDB.ChangeFeedProcessor
2+
{
3+
class DocumentFeedObserverFactory<T> : IDocumentFeedObserverFactory where T : IChangeFeedObserver, new()
4+
{
5+
public virtual IChangeFeedObserver CreateObserver()
6+
{
7+
return new T();
8+
}
9+
}
10+
}

0 commit comments

Comments
 (0)