Skip to content

Commit 88f7ec8

Browse files
authored
Merge pull request Azure#320 from ealsur/master
Syncing Change Feed Processor
2 parents e7b5bcc + 418e865 commit 88f7ec8

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

samples/ChangeFeedProcessor/DocumentDB.ChangeFeedProcessor/ChangeFeedEventHost.cs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class ChangeFeedEventHost : IPartitionObserver<DocumentServiceLease>
8383
{
8484
const string DefaultUserAgentSuffix = "changefeed-0.2";
8585
const string LeaseContainerName = "docdb-changefeed";
86+
const string LSNPropertyName = "_lsn";
87+
8688
readonly DocumentCollectionInfo collectionLocation;
8789

8890
string leasePrefix;
@@ -170,6 +172,60 @@ public async Task RegisterObserverFactoryAsync(IChangeFeedObserverFactory factor
170172
await this.StartAsync();
171173
}
172174

175+
/// <summary>
176+
/// Asynchronously checks the current existing leases and calculates an estimate of remaining work per leased partitions.
177+
/// </summary>
178+
/// <returns>An estimate amount of remaining documents to be processed</returns>
179+
public async Task<long> GetEstimatedRemainingWork()
180+
{
181+
await this.InitializeAsync();
182+
183+
long remaining = 0;
184+
ChangeFeedOptions options = new ChangeFeedOptions
185+
{
186+
MaxItemCount = 1
187+
};
188+
189+
foreach (DocumentServiceLease existingLease in await this.leaseManager.ListLeases())
190+
{
191+
options.PartitionKeyRangeId = existingLease.PartitionId;
192+
options.RequestContinuation = existingLease.ContinuationToken;
193+
IDocumentQuery<Document> query = this.documentClient.CreateDocumentChangeFeedQuery(this.collectionSelfLink, options);
194+
FeedResponse<Document> response = null;
195+
196+
try
197+
{
198+
response = await query.ExecuteNextAsync<Document>();
199+
long parsedLSNFromSessionToken = TryConvertToNumber(ParseAmountFromSessionToken(response.SessionToken));
200+
long lastSequenceNumber = response.Count > 0 ? TryConvertToNumber(response.First().GetPropertyValue<string>(LSNPropertyName)) : parsedLSNFromSessionToken;
201+
long partitionRemaining = parsedLSNFromSessionToken - lastSequenceNumber;
202+
remaining += partitionRemaining < 0 ? 0 : partitionRemaining;
203+
}
204+
catch (DocumentClientException ex)
205+
{
206+
ExceptionDispatchInfo exceptionDispatchInfo = ExceptionDispatchInfo.Capture(ex);
207+
DocumentClientException dcex = (DocumentClientException)exceptionDispatchInfo.SourceException;
208+
if ((StatusCode.NotFound == (StatusCode)dcex.StatusCode && SubStatusCode.ReadSessionNotAvailable != (SubStatusCode)GetSubStatusCode(dcex))
209+
|| StatusCode.Gone == (StatusCode)dcex.StatusCode)
210+
{
211+
// We are not explicitly handling Splits here to avoid any collision with an Observer that might have picked this up and managing the split
212+
TraceLog.Error(string.Format("GetEstimateWork > Partition {0}: resource gone (subStatus={1}).", existingLease.PartitionId, GetSubStatusCode(dcex)));
213+
}
214+
else if (StatusCode.TooManyRequests == (StatusCode)dcex.StatusCode ||
215+
StatusCode.ServiceUnavailable == (StatusCode)dcex.StatusCode)
216+
{
217+
TraceLog.Warning(string.Format("GetEstimateWork > Partition {0}: retriable exception : {1}", existingLease.PartitionId, dcex.Message));
218+
}
219+
else
220+
{
221+
TraceLog.Error(string.Format("GetEstimateWork > Partition {0}: Unhandled exception", ex.Error.Message));
222+
}
223+
}
224+
}
225+
226+
return remaining;
227+
}
228+
173229
/// <summary>Asynchronously shuts down the host instance. This method maintains the leases on all partitions currently held, and enables each
174230
/// host instance to shut down cleanly by invoking the method with object.</summary>
175231
/// <returns>A task that indicates the host instance has stopped.</returns>
@@ -759,6 +815,34 @@ private bool IsCheckpointNeeded(DocumentServiceLease lease, CheckpointStats chec
759815
return isCheckpointNeeded;
760816
}
761817

818+
private static long TryConvertToNumber(string number)
819+
{
820+
if (string.IsNullOrEmpty(number))
821+
{
822+
return 0;
823+
}
824+
825+
long parsed = 0;
826+
if (!long.TryParse(number, NumberStyles.Any, CultureInfo.InvariantCulture, out parsed))
827+
{
828+
TraceLog.Warning(string.Format(CultureInfo.InvariantCulture, "Cannot parse number '{0}'.", number));
829+
return 0;
830+
}
831+
832+
return parsed;
833+
}
834+
835+
private static string ParseAmountFromSessionToken(string sessionToken)
836+
{
837+
if (string.IsNullOrEmpty(sessionToken))
838+
{
839+
return string.Empty;
840+
}
841+
842+
int separatorIndex = sessionToken.IndexOf(':');
843+
return sessionToken.Substring(separatorIndex + 1);
844+
}
845+
762846
private static int GetDocumentCount(ResourceResponse<DocumentCollection> response)
763847
{
764848
Debug.Assert(response != null);

0 commit comments

Comments
 (0)