-
Notifications
You must be signed in to change notification settings - Fork 494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Internal] FFCF: Initial PR for AllVersionsAndDeletes #4003
Conversation
Seems like there's some line endings been messed up somewhere, will fix |
Fixed now, wasn't line endings or indentation, not sure why Git felt the whole file had changed |
Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Observers/ChangeFeedObserverFactoryCore.cs
Outdated
Show resolved
Hide resolved
@@ -111,6 +125,25 @@ public override ChangeFeedObserver CreateObserver() | |||
return this.onChangesWithManualCheckpoint(context, changes, context.CheckpointAsync, cancellationToken); | |||
} | |||
|
|||
private Task AllVersionsAndDeleteStreamHandlerAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete --> Deletes. Similar feedback on other names that have the singular form of delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iainx looks like you marked this comment as resolved but the name still uses a singular "Delete". I think this should be "AllVersionsAndDeletesStreamHandlerAsync"
…os-dotnet-v3 into dev/iainx/alldelete-wip
…correctly by EncryptionContainer. Having it be abstract meant that it needed to be implemented when PREVIEW was true and false, but when PREVIEW was false, some types were internal and Microsoft.Azure.Cosmos.Encryption did not have access to them.
/// An item representing a document change | ||
/// </summary> | ||
/// <typeparam name="T">The document type</typeparam> | ||
public sealed class ChangeFeedProcessorItem<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this when doing the pull model work.
Microsoft.Azure.Cosmos/src/Resource/FullFidelity/ChangeFeedItemChange{T}.cs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've given this feedback before, but I think we should rename this class to "ChangeFeedItem", "ChangeFeedItemChange" is repetitive and confusing IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should, but I'd prefer to do that in a separate PR because it affects the whole of the ChangeFeedProcessor and would make this PR more confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on that changing this in another PR.
#else | ||
internal | ||
#endif | ||
virtual ChangeFeedProcessorBuilder GetAllVersionsChangeFeedProcessorBuilder<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we be consistent here. GetAllVersionsAndDeletesChangeFeedProcessorBuilder? And why do we need this if this is only going to throw a NotImplementedException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of how the Encryption module is implemented.
This can't be abstract as that would mean it needs to be implemented by the EncryptionContainer
subclass, but because Microsoft.Azure.CosmosDB
doesn't expose its internals to Microsoft.Azure.CosmosDB.Encryption
EncryptionContainer
can only implement this is PREVIEW
is set. That means, if PREVIEW
is not set, a build error happens because an abstract method isn't implemented.
Making this method virtual with an empty body fixes that, which looks and feels weird and wrong but that's a side effect of the way Microsoft.Azure.CosmosDB.Encryption
is using Microsoft.Azure.CosmosDB
as a PackageReference
and not a ProjectReference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dang GetAllVersionsAndDeletesChangeFeedProcessorBuilder
is long :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider shortening to GetAllVersionsAndDeletesCFPBuilder
? I'm not sure if there's guidelines against using acronyms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it would be a bit inconsistent, some called ChangeFeedProcessor and others CFP :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we have to live with the long name or pass the mode as a parameter. BTW, Java exposes all versions and deletes as a separate method reusing the same builder, not sure if this would apply to .NET/ simplify creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
following other similar method naming conventions.
@@ -577,7 +577,8 @@ internal abstract partial class ContainerCore : ContainerInternal | |||
ChangeFeedObserverFactory observerFactory = new CheckpointerObserverFactory( | |||
new ChangeFeedObserverFactoryCore<T>(onChangesDelegate, this.ClientContext.SerializerCore), | |||
withManualCheckpointing: false); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, observerFactory); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, | |||
observerFactory, ChangeFeedMode.Incremental); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why we are defaulting this to Incremental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, all "Incremental" should use "LatestVersion" instead as this is the new name
@@ -54,6 +54,7 @@ internal sealed class ChangeFeedEstimatorIterator : FeedIterator<ChangeFeedProce | |||
changeFeedEstimatorRequestOptions, | |||
(DocumentServiceLease lease, string continuationToken, bool startFromBeginning) => ChangeFeedPartitionKeyResultSetIteratorCore.Create( | |||
lease: lease, | |||
mode: ChangeFeedMode.LatestVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we defaulting to LatestVersion here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to extend my comment. Are we saying that ChangeFeedEstimatorIterator is only used in LastestVersion and never AllVersionsAndDeletes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I used that as the default because that was how it was used before AVAD was added and I didn't want to break the API or the contract of a public API. Adding a new public API that adds a mode parameter makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change Feed Estimator uses the LSN from the response to calculate the estimated delta. Do we need AllVersionsAndDelete mode? Or does the LatestVersion mode provide the same output? The math is taking the responses' SessionToken minus the first item's LSN:
azure-cosmos-dotnet-v3/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs
Lines 286 to 290 in 7ed22ac
long parsedLSNFromSessionToken = ChangeFeedEstimatorIterator.TryConvertToNumber(ExtractLsnFromSessionToken(response.Headers.Session)); | |
IEnumerable<JObject> items = ChangeFeedEstimatorIterator.GetItemsFromResponse(response); | |
long lastQueryLSN = items.Any() | |
? ChangeFeedEstimatorIterator.TryConvertToNumber(ChangeFeedEstimatorIterator.GetFirstItemLSN(items)) - 1 | |
: parsedLSNFromSessionToken; |
If this requires AllVersionsAndDeletes to be set, then (and I would probably suggest doing it as a separate PR), maybe the solution is to store which mode the processor was run in on the .info
document and use that to define the mode of the Estimator (to avoid 2 Estimator APIs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After syncing with @jcocchi, using LatestVersion for the Estimator is ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipthomas-MSFT change feed estimator looks at the lag based on the current leases, calculating the lag is independent of change feed mode. The existing estimator works for both modes by design
@@ -597,7 +598,34 @@ internal abstract partial class ContainerCore : ContainerInternal | |||
ChangeFeedObserverFactory observerFactory = new CheckpointerObserverFactory( | |||
new ChangeFeedObserverFactoryCore<T>(onChangesDelegate, this.ClientContext.SerializerCore), | |||
withManualCheckpointing: false); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, observerFactory); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, | |||
observerFactory, ChangeFeedMode.Incremental); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another decision to default this to Incremental? Why not LatestVersion?
@@ -617,7 +645,8 @@ internal abstract partial class ContainerCore : ContainerInternal | |||
ChangeFeedObserverFactory observerFactory = new CheckpointerObserverFactory( | |||
new ChangeFeedObserverFactoryCore<T>(onChangesDelegate, this.ClientContext.SerializerCore), | |||
withManualCheckpointing: true); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, observerFactory); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, | |||
observerFactory, ChangeFeedMode.Incremental); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Incremental and not LatestVersion?
@@ -637,7 +666,8 @@ internal abstract partial class ContainerCore : ContainerInternal | |||
ChangeFeedObserverFactory observerFactory = new CheckpointerObserverFactory( | |||
new ChangeFeedObserverFactoryCore(onChangesDelegate), | |||
withManualCheckpointing: false); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, observerFactory); | |||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, | |||
observerFactory, ChangeFeedMode.Incremental); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Incremental and not LatestVersion?
return this.GetChangeFeedProcessorBuilderPrivate(processorName, observerFactory); | ||
return this.GetChangeFeedProcessorBuilderPrivate(processorName, | ||
observerFactory, | ||
ChangeFeedMode.Incremental); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Incremental and not LatestVersion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I wasn't sure which was the new version and which was the old. I'll switch them all to LatestVersion
@@ -80,6 +83,94 @@ public async Task TestWithRunningProcessor() | |||
Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator); | |||
} | |||
|
|||
[TestMethod] | |||
public async Task TestWithRunningFFCFProcessor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit difficult to follow what this test is actually testing for? Maybe too much in one test? Maybe one test can create docs, then verify docs created. Maybe another test to created docs, then updated, then verify docs updated. Maybe another test and verify deletes. Maybe another tests that is past retention and should have no documents.
@@ -78,6 +78,7 @@ public async Task EtagPassesContinuation() | |||
|
|||
ChangeFeedPartitionKeyResultSetIteratorCore iterator = ChangeFeedPartitionKeyResultSetIteratorCore.Create( | |||
lease: documentServiceLeaseCore, | |||
mode: ChangeFeedMode.Incremental, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goes for all of the tests for Incremental, LatestVersion is going to be replacing this so we need to figure out when we want to transition. We can't get rid of Incremental until a major release version change, but we are exposing LatestVersion and giving cx access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the old names, i.e. Incremental
etc not be marked BrowserVisible.Never
? Having a noisy set of identifiers in intellisense is a horrible experience - even if obsoletion notes in the xmldoc could temper it a little.
Also I think marking them Obsolete
would be for the best
- it's not going to inconvenience a lot of people
- it would flush out all usages flagged in the PR reviews here (and the tests that need to cover them can inhibit the warnings)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking Obsolete(which I'd love to) might break a build, which for many customers, is a no-no and considered a breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why obsoletion is by default only a warning for C# and F# :P But, fair enough.
IMO hiding it via BrowserVisible is the right thing to do there - the Intellisense experience will be far less noisy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but it is common for users to have TreatWarningsAsErrors and that breaks a build :(
I agree that hiding it would be great from Intellisense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[EditorBrowsable(EditorBrowsableState.Never)]
would be the right decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but it is common for users to have TreatWarningsAsErrors and that breaks a build
I get that, and I've already accepted that it's something that's reasonable to make a subjective call on, for better or for worse.
But now we're still talking, some more devil's advocate points for Obsolete:
- people with TWAE are adults that have opted into something
- using a barely out of preview feature is absolutely the definition of a non-mainline scenario
- they will thank you as a pile of docs are about to make the old names look very stupid
- FCCF is not a mainline scenario
- I don't think it's actually written anywhere that obsoleting something is as cheeky as a breaking change?
- The fact that this repo is working on an evergreen basis without any obsoletions in site is another to consider bending the rule in this instance
Having said all of that, hiding it with BrowserVisible is a fine solution for me in terms of the actual impact on real humans.
@iainx can you please add a simple usage example to the description so it's clear how customers would use the new APIs? +1 |
Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs
Show resolved
Hide resolved
{ | ||
this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory)); | ||
this.mode = mode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add the mode to the ChangeFeedProcessorOptions that is already passed down?
requestMessage.Headers.Add( | ||
HttpConstants.HttpHeaders.A_IM, | ||
HttpConstants.A_IMHeaderValues.IncrementalFeed); | ||
// Add the necessary mode headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point during the CFP initialization, a .info
file is created. Should we store the mode there? Also, what happens if a customer starts a normal (Incremental) CFP and then changes to an AllVersions one with the same leases (same LSNs)? Should we allow that? If not, this .info
file can be used to check the mode and throw if it doesn't match? Thoughts @jcocchi ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the mode on the .info
file is a good idea. In Java they added the mode as part of the continuation token for each lease.
I was under the impression that reusing leases across modes wasn't supported for technical reasons (though customers have already asked how they can use latest version until they "catch up" to all versions and deletes then switch over). Supporting lease compatibility is a good future goal but not current requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.info
or in the Leases, both solutions work, yeah. As long as we save it to use it, for example, in the Estimator or to validate if the leases were created with one mode and now attempted to be used with another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to confirm if Java SDK errors out if you attempt to use leases from one mode and then switch the Processor to use another. For consistency, we should either error out or not. If we need to error out, we need to store the mode somewhere to compare the current mode vs. what was stored in .info
or the leases (wherever we want to store the mode)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirming this with Java team.
Done. It was initially in the API discussion thread, but I've moved it to here |
@@ -592,6 +592,51 @@ internal sealed class EncryptionContainer : Container | |||
}); | |||
} | |||
|
|||
#if SDKPROJECTREF | |||
public override ChangeFeedProcessorBuilder GetAllVersionsChangeFeedProcessorBuilder<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are the unit or integration tests for GetAllVersionsChangeFeedProcessorBuilder<T>
?
@@ -592,6 +592,51 @@ internal sealed class EncryptionContainer : Container | |||
}); | |||
} | |||
|
|||
#if SDKPROJECTREF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the significance of SDKPROJECTREF
for this block of code?
@@ -54,6 +54,7 @@ internal sealed class ChangeFeedEstimatorIterator : FeedIterator<ChangeFeedProce | |||
changeFeedEstimatorRequestOptions, | |||
(DocumentServiceLease lease, string continuationToken, bool startFromBeginning) => ChangeFeedPartitionKeyResultSetIteratorCore.Create( | |||
lease: lease, | |||
mode: ChangeFeedMode.LatestVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -69,6 +69,17 @@ public ChangeFeedProcessorBuilder WithInstanceName(string instanceName) | |||
return this; | |||
} | |||
|
|||
/// <summary> | |||
/// Sets the mode for the change freed processor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling correction "feed".
new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey), | ||
ContainerProperties properties = new ContainerProperties(id: Guid.NewGuid().ToString(), | ||
partitionKeyPath: PartitionKey); | ||
properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to change. No need to set retention for non-AllVersionsAndDeletes tests.
replaced by another PR that is already closed. |
Description
WIP initial PR for comments
Waiting for a final API before writing full documentation and tests
API example