From 811ec32f29ed35d529d76338a1f3ccecbf4de4c1 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 12:10:35 -0500 Subject: [PATCH 01/13] DOCSP-34008: Split large change events --- .../crud/read-operations/change-streams.txt | 76 ++++++++++-- .../change-streams/change-streams.cs | 115 ++++++++++++++++++ 2 files changed, 184 insertions(+), 7 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index f346ea26..73359c0c 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -111,6 +111,7 @@ for only specified change events. Create the pipeline by using the You can specify the following aggregation stages in the ``pipeline`` parameter: - ``$addFields`` +- ``$changeStreamSplitEvent`` - ``$match`` - ``$project`` - ``$replaceRoot`` @@ -119,9 +120,19 @@ You can specify the following aggregation stages in the ``pipeline`` parameter: - ``$set`` - ``$unset`` -To learn how to build an aggregation pipeline by using the -``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in -the Operations with Builders guide. +.. tip:: + + To learn how to build an aggregation pipeline by using the + ``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in + the Operations with Builders guide. + + To learn more about modifying your change stream output, see the + :manual:`Modify Change Stream Output + ` section in the {+mdb-server+} + manual. + +Monitor Update Events Example +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The following example uses the ``pipeline`` parameter to open a change stream that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the @@ -145,10 +156,61 @@ corresponding code. :end-before: end-change-stream-pipeline :language: csharp -To learn more about modifying your change stream output, see the -:manual:`Modify Change Stream Output -` section in the {+mdb-server+} -manual. +Split Large Change Events Example +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If your application generates change events that exceed 16 MB, the +{+driver-short+} returns an error. To avoid this error, you can use +the ``$changeStreamSplitEvent`` pipeline stage to split the events +into smaller fragments. + +After you receive the change stream event fragments, you can use the +following helper methods to reassemble the fragments into a single +change stream document: + +.. tabs:: + + .. tab:: Asynchronous + :tabid: split-event-helpers-async + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-change-stream-pipeline-async + :end-before: end-change-stream-pipeline-async + :language: csharp + + .. tab:: Synchronous + :tabid: split-event-helpers-sync + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-split-event-helpers-sync + :end-before: end-split-event-helpers-sync + :language: csharp + +This example instructs the driver to watch for changes and split +large change events, calling the preceding helper methods to +reassemble the event fragments: + +.. tabs:: + + .. tab:: Asynchronous + :tabid: change-stream-split-async + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-split-change-event-async + :end-before: end-split-change-event-async + :language: csharp + + .. tab:: Synchronous + :tabid: change-stream-split-sync + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-split-change-event-sync + :end-before: end-split-change-event-sync + :language: csharp + +To learn more about splitting large change events, see the +:manual:`$changeStreamSplitLargeEvent ` +page in the {+mdb-server+} manual. Modify ``Watch()`` Behavior --------------------------- diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index edeaf709..d0dcb97d 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -65,6 +65,121 @@ await cursor.ForEachAsync(change => } // end-change-stream-pipeline +// start-split-event-helpers-sync +// Fetches the next complete change stream event +private static ChangeStreamDocument GetNextChangeStreamEvent( + IEnumerator> changeStreamEnumerator) +{ + changeStreamEnumerator.MoveNext(); + var changeStreamEvent = changeStreamEnumerator.Current; + + // Reassembles change event fragments if the event is split + if (changeStreamEvent.SplitEvent != null) + { + var fragment = changeStreamEvent; + while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) + { + changeStreamEnumerator.MoveNext(); + fragment = changeStreamEnumerator.Current; + MergeFragment(changeStreamEvent, fragment); + } + } + return changeStreamEvent; +} + +// Merges a fragment into the base event +private static void MergeFragment( + ChangeStreamDocument changeStreamEvent, + ChangeStreamDocument fragment) +{ + foreach (var element in fragment.BackingDocument) + { + if (element.Name != "_id" && element.Name != "splitEvent") + { + changeStreamEvent.BackingDocument[element.Name] = element.Value; + } + } +} +// end-split-event-helpers-sync + +// start-split-event-helpers-async +// Fetches the next complete change stream event +private static async Task> GetNextChangeStreamEventAsync( + IAsyncCursor> changeStreamCursor) +{ + if (!await changeStreamCursor.MoveNextAsync()) + { + throw new InvalidOperationException("No more change stream events available."); + } + + var changeStreamEvent = changeStreamCursor.Current.First(); + + // Reassembles change event fragments if the event is split + if (changeStreamEvent.SplitEvent != null) + { + var fragment = changeStreamEvent; + while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) + { + if (!await changeStreamCursor.MoveNextAsync()) + { + throw new InvalidOperationException("Incomplete split event fragments."); + } + fragment = changeStreamCursor.Current.First(); + MergeFragment(changeStreamEvent, fragment); + } + } + return changeStreamEvent; +} + +// Merges a fragment into the base event +private static void MergeFragment( + ChangeStreamDocument changeStreamEvent, + ChangeStreamDocument fragment) +{ + foreach (var element in fragment.BackingDocument) + { + if (element.Name != "_id" && element.Name != "splitEvent") + { + changeStreamEvent.BackingDocument[element.Name] = element.Value; + } + } +} +// end-split-event-helpers-async + +// start-split-change-event-async +var pipeline = new EmptyPipelineDefinition>() + .ChangeStreamSplitLargeEvent(); + +using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) +{ + while (await cursor.MoveNextAsync()) + { + foreach (var changeStreamEvent in cursor.Current) + { + var completeEvent = await GetNextChangeStreamEventAsync(cursor); + Console.WriteLine("Reassembled change event: " + completeEvent.FullDocument); + } + } +} +// end-split-change-event-async + +// start-split-change-event-sync +var pipeline = new EmptyPipelineDefinition>() + .ChangeStreamSplitLargeEvent(); + +using (var cursor = _restaurantsCollection.Watch(pipeline)) +{ + using (var enumerator = cursor.ToEnumerable().GetEnumerator()) + { + while (enumerator.MoveNext()) + { + var completeEvent = GetNextChangeStreamEvent(enumerator); + Console.WriteLine("Reassembled change event: " + completeEvent.FullDocument); + } + } +} +// end-split-change-event-sync + // start-change-stream-post-image var pipeline = new EmptyPipelineDefinition>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); From a9c6e48f44f592fe340b25262893d22ba015a2c7 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 13:28:07 -0500 Subject: [PATCH 02/13] fixes --- source/fundamentals/crud/read-operations/change-streams.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index 73359c0c..49ea01a7 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -174,8 +174,8 @@ change stream document: :tabid: split-event-helpers-async .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs - :start-after: start-change-stream-pipeline-async - :end-before: end-change-stream-pipeline-async + :start-after: start-split-event-helpers-async + :end-before: end-split-event-helpers-async :language: csharp .. tab:: Synchronous @@ -209,7 +209,7 @@ reassemble the event fragments: :language: csharp To learn more about splitting large change events, see the -:manual:`$changeStreamSplitLargeEvent ` +:manual:`$changeStreamSplitLargeEvent ` page in the {+mdb-server+} manual. Modify ``Watch()`` Behavior From 99042259858563c57068d1d8abd398f8dcfcab90 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 14:58:15 -0500 Subject: [PATCH 03/13] edits --- source/fundamentals/crud/read-operations/change-streams.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index 49ea01a7..35f56cf6 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -111,7 +111,7 @@ for only specified change events. Create the pipeline by using the You can specify the following aggregation stages in the ``pipeline`` parameter: - ``$addFields`` -- ``$changeStreamSplitEvent`` +- ``$changeStreamSplitLargeEvent`` - ``$match`` - ``$project`` - ``$replaceRoot`` @@ -161,7 +161,7 @@ Split Large Change Events Example If your application generates change events that exceed 16 MB, the {+driver-short+} returns an error. To avoid this error, you can use -the ``$changeStreamSplitEvent`` pipeline stage to split the events +the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events into smaller fragments. After you receive the change stream event fragments, you can use the From b8402090350dcb7716deea9df335b31f17ba95d1 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 15:12:35 -0500 Subject: [PATCH 04/13] fixes to async code --- .../code-examples/change-streams/change-streams.cs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index d0dcb97d..e0865941 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -107,11 +107,6 @@ private static void MergeFragment( private static async Task> GetNextChangeStreamEventAsync( IAsyncCursor> changeStreamCursor) { - if (!await changeStreamCursor.MoveNextAsync()) - { - throw new InvalidOperationException("No more change stream events available."); - } - var changeStreamEvent = changeStreamCursor.Current.First(); // Reassembles change event fragments if the event is split @@ -150,14 +145,14 @@ private static void MergeFragment( var pipeline = new EmptyPipelineDefinition>() .ChangeStreamSplitLargeEvent(); -using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) +using (var cursor = await restaurantsCollection.WatchAsync(pipeline)) { while (await cursor.MoveNextAsync()) { foreach (var changeStreamEvent in cursor.Current) { var completeEvent = await GetNextChangeStreamEventAsync(cursor); - Console.WriteLine("Reassembled change event: " + completeEvent.FullDocument); + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); } } } From 88dcab38f388e36df5f100f0733322d48f587243 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 15:25:02 -0500 Subject: [PATCH 05/13] fixes to sync code --- source/includes/code-examples/change-streams/change-streams.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index e0865941..64ad4c30 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -70,7 +70,6 @@ await cursor.ForEachAsync(change => private static ChangeStreamDocument GetNextChangeStreamEvent( IEnumerator> changeStreamEnumerator) { - changeStreamEnumerator.MoveNext(); var changeStreamEvent = changeStreamEnumerator.Current; // Reassembles change event fragments if the event is split @@ -169,7 +168,7 @@ private static void MergeFragment( while (enumerator.MoveNext()) { var completeEvent = GetNextChangeStreamEvent(enumerator); - Console.WriteLine("Reassembled change event: " + completeEvent.FullDocument); + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); } } } From 2cdf355f82eea24910ef787e486866d87074a9e5 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 15:28:42 -0500 Subject: [PATCH 06/13] wording --- source/fundamentals/crud/read-operations/change-streams.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index 35f56cf6..fef1d9c3 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -187,8 +187,9 @@ change stream document: :language: csharp This example instructs the driver to watch for changes and split -large change events, calling the preceding helper methods to -reassemble the event fragments: +change events that exceed the 16 MB limit. The code prints the +change document for each event and calls the preceding helper methods to +reassemble any event fragments: .. tabs:: From 78209e30a523586b263e455cff4d053041af01d9 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 15:29:35 -0500 Subject: [PATCH 07/13] admonition --- .../crud/read-operations/change-streams.txt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index fef1d9c3..d069954e 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -159,7 +159,7 @@ corresponding code. Split Large Change Events Example ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -If your application generates change events that exceed 16 MB, the +If your application generates change events that exceed 16 MB in size, the {+driver-short+} returns an error. To avoid this error, you can use the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events into smaller fragments. @@ -209,9 +209,11 @@ reassemble any event fragments: :end-before: end-split-change-event-sync :language: csharp -To learn more about splitting large change events, see the -:manual:`$changeStreamSplitLargeEvent ` -page in the {+mdb-server+} manual. +.. tip:: + + To learn more about splitting large change events, see the + :manual:`$changeStreamSplitLargeEvent ` + page in the {+mdb-server+} manual. Modify ``Watch()`` Behavior --------------------------- From 7b6e0c7c45b3373ab8ab9ee4b9c2ef371328a807 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 15:32:26 -0500 Subject: [PATCH 08/13] small fix --- source/includes/code-examples/change-streams/change-streams.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index 64ad4c30..06924d7c 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -144,7 +144,7 @@ private static void MergeFragment( var pipeline = new EmptyPipelineDefinition>() .ChangeStreamSplitLargeEvent(); -using (var cursor = await restaurantsCollection.WatchAsync(pipeline)) +using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) { while (await cursor.MoveNextAsync()) { From db64098923168fd270928101b08f30166df1adbf Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 12 Dec 2024 16:51:34 -0500 Subject: [PATCH 09/13] MM feedback --- .../crud/read-operations/change-streams.txt | 6 +++--- .../code-examples/change-streams/change-streams.cs | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index d069954e..272df093 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -160,7 +160,7 @@ Split Large Change Events Example ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If your application generates change events that exceed 16 MB in size, the -{+driver-short+} returns an error. To avoid this error, you can use +server returns a ``BSONObjectTooLarge`` error. To avoid this error, you can use the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events into smaller fragments. @@ -211,9 +211,9 @@ reassemble any event fragments: .. tip:: - To learn more about splitting large change events, see the + To learn more about splitting large change events, see :manual:`$changeStreamSplitLargeEvent ` - page in the {+mdb-server+} manual. + in the {+mdb-server+} manual. Modify ``Watch()`` Behavior --------------------------- diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index 06924d7c..c8b9fdf2 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -42,7 +42,7 @@ await cursor.ForEachAsync(change => .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change stream and prints the changes as they're received -using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) +using (var cursor = await collection.WatchAsync(pipeline)) { await cursor.ForEachAsync(change => { @@ -56,7 +56,7 @@ await cursor.ForEachAsync(change => .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change streams and print the changes as they're received -using (var cursor = _restaurantsCollection.Watch(pipeline)) +using (var cursor = collection.Watch(pipeline)) { foreach (var change in cursor.ToEnumerable()) { @@ -144,7 +144,7 @@ private static void MergeFragment( var pipeline = new EmptyPipelineDefinition>() .ChangeStreamSplitLargeEvent(); -using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) +using (var cursor = await collection.WatchAsync(pipeline)) { while (await cursor.MoveNextAsync()) { @@ -161,7 +161,7 @@ private static void MergeFragment( var pipeline = new EmptyPipelineDefinition>() .ChangeStreamSplitLargeEvent(); -using (var cursor = _restaurantsCollection.Watch(pipeline)) +using (var cursor = collection.Watch(pipeline)) { using (var enumerator = cursor.ToEnumerable().GetEnumerator()) { @@ -183,7 +183,7 @@ private static void MergeFragment( FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; -using (var cursor = _restaurantsCollection.Watch(pipeline, options)) +using (var cursor = collection.Watch(pipeline, options)) { foreach (var change in cursor.ToEnumerable()) { @@ -201,7 +201,7 @@ private static void MergeFragment( FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; -using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options); +using var cursor = await collection.WatchAsync(pipeline, options); await cursor.ForEachAsync(change => { Console.WriteLine(change.FullDocument.ToBsonDocument()); From 5764b75d377222bac2eda923328ea788c274ca5a Mon Sep 17 00:00:00 2001 From: norareidy Date: Tue, 17 Dec 2024 10:49:37 -0500 Subject: [PATCH 10/13] FP feedback --- .../crud/read-operations/change-streams.txt | 44 ++++++++++--------- .../change-streams/change-streams.cs | 4 +- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index 272df093..9835ed50 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -162,53 +162,55 @@ Split Large Change Events Example If your application generates change events that exceed 16 MB in size, the server returns a ``BSONObjectTooLarge`` error. To avoid this error, you can use the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events -into smaller fragments. +into smaller fragments. The {+driver-short+} aggregation API includes the +``ChangeStreamSplitLargeEvent()`` method, which you can use to add the +``$changeStreamSplitLargeEvent`` stage to the change stream pipeline. -After you receive the change stream event fragments, you can use the -following helper methods to reassemble the fragments into a single -change stream document: +This example instructs the driver to watch for changes and split +change events that exceed the 16 MB limit. The code prints the +change document for each event and calls helper methods to +reassemble any event fragments: .. tabs:: .. tab:: Asynchronous - :tabid: split-event-helpers-async + :tabid: change-stream-split-async .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs - :start-after: start-split-event-helpers-async - :end-before: end-split-event-helpers-async + :start-after: start-split-change-event-async + :end-before: end-split-change-event-async :language: csharp .. tab:: Synchronous - :tabid: split-event-helpers-sync + :tabid: change-stream-split-sync .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs - :start-after: start-split-event-helpers-sync - :end-before: end-split-event-helpers-sync + :start-after: start-split-change-event-sync + :end-before: end-split-change-event-sync :language: csharp -This example instructs the driver to watch for changes and split -change events that exceed the 16 MB limit. The code prints the -change document for each event and calls the preceding helper methods to -reassemble any event fragments: +The preceding example uses the ``GetNextChangeStreamEvent()`` and ``MergeFragment()`` +methods to reassemble change event fragments into a single change stream document. +The following code defines these methods: .. tabs:: .. tab:: Asynchronous - :tabid: change-stream-split-async + :tabid: split-event-helpers-async .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs - :start-after: start-split-change-event-async - :end-before: end-split-change-event-async + :start-after: start-split-event-helpers-async + :end-before: end-split-event-helpers-async :language: csharp .. tab:: Synchronous - :tabid: change-stream-split-sync + :tabid: split-event-helpers-sync .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs - :start-after: start-split-change-event-sync - :end-before: end-split-change-event-sync + :start-after: start-split-event-helpers-sync + :end-before: end-split-event-helpers-sync :language: csharp - + .. tip:: To learn more about splitting large change events, see diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index c8b9fdf2..3d0656a6 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -103,7 +103,7 @@ private static void MergeFragment( // start-split-event-helpers-async // Fetches the next complete change stream event -private static async Task> GetNextChangeStreamEventAsync( +private static async Task> GetNextChangeStreamEvent( IAsyncCursor> changeStreamCursor) { var changeStreamEvent = changeStreamCursor.Current.First(); @@ -150,7 +150,7 @@ private static void MergeFragment( { foreach (var changeStreamEvent in cursor.Current) { - var completeEvent = await GetNextChangeStreamEventAsync(cursor); + var completeEvent = await GetNextChangeStreamEvent(cursor); Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); } } From 949557d50ad0f7003b2aa6d75ae579e82928c663 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 26 Dec 2024 13:58:32 -0500 Subject: [PATCH 11/13] tech feedback --- .../crud/read-operations/change-streams.txt | 9 +- .../change-streams/change-streams.cs | 110 +++++++++--------- 2 files changed, 63 insertions(+), 56 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index 9835ed50..b7820293 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -189,7 +189,14 @@ reassemble any event fragments: :end-before: end-split-change-event-sync :language: csharp -The preceding example uses the ``GetNextChangeStreamEvent()`` and ``MergeFragment()`` +.. note:: + + We recommend reassembling change event fragments, as shown in the + preceding example, but this step is optional. You can use the same + logic to watch split events and complete change events. + +The preceding example uses the ``GetNextChangeStreamEvent()``, +``GetNextChangeStreamEventAsync()``, and ``MergeFragment()`` methods to reassemble change event fragments into a single change stream document. The following code defines these methods: diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index 3d0656a6..4fc7f1f7 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -67,24 +67,25 @@ await cursor.ForEachAsync(change => // start-split-event-helpers-sync // Fetches the next complete change stream event -private static ChangeStreamDocument GetNextChangeStreamEvent( + private static IEnumerable> GetNextChangeStreamEvent( IEnumerator> changeStreamEnumerator) -{ - var changeStreamEvent = changeStreamEnumerator.Current; - - // Reassembles change event fragments if the event is split - if (changeStreamEvent.SplitEvent != null) - { - var fragment = changeStreamEvent; - while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) - { - changeStreamEnumerator.MoveNext(); - fragment = changeStreamEnumerator.Current; - MergeFragment(changeStreamEvent, fragment); - } - } - return changeStreamEvent; -} + { + while (changeStreamEnumerator.MoveNext()) + { + var changeStreamEvent = changeStreamEnumerator.Current; + if (changeStreamEvent.SplitEvent != null) + { + var fragment = changeStreamEvent; + while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) + { + changeStreamEnumerator.MoveNext(); + fragment = changeStreamEnumerator.Current; + MergeFragment(changeStreamEvent, fragment); + } + } + yield return changeStreamEvent; + } + } // Merges a fragment into the base event private static void MergeFragment( @@ -103,26 +104,37 @@ private static void MergeFragment( // start-split-event-helpers-async // Fetches the next complete change stream event -private static async Task> GetNextChangeStreamEvent( +private static async IAsyncEnumerable> GetNextChangeStreamEventAsync( IAsyncCursor> changeStreamCursor) { - var changeStreamEvent = changeStreamCursor.Current.First(); - - // Reassembles change event fragments if the event is split - if (changeStreamEvent.SplitEvent != null) + var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator(); + while (await changeStreamEnumerator.MoveNextAsync()) { - var fragment = changeStreamEvent; - while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) + var changeStreamEvent = changeStreamEnumerator.Current; + if (changeStreamEvent.SplitEvent != null) { - if (!await changeStreamCursor.MoveNextAsync()) + var fragment = changeStreamEvent; + while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) { - throw new InvalidOperationException("Incomplete split event fragments."); + await changeStreamEnumerator.MoveNextAsync(); + fragment = changeStreamEnumerator.Current; + MergeFragment(changeStreamEvent, fragment); } - fragment = changeStreamCursor.Current.First(); - MergeFragment(changeStreamEvent, fragment); + } + yield return changeStreamEvent; + } +} + +private static async IAsyncEnumerable> GetNextChangeStreamEventFragmentAsync( + IAsyncCursor> changeStreamCursor) +{ + while (await changeStreamCursor.MoveNextAsync()) + { + foreach (var changeStreamEvent in changeStreamCursor.Current) + { + yield return changeStreamEvent; } } - return changeStreamEvent; } // Merges a fragment into the base event @@ -140,39 +152,27 @@ private static void MergeFragment( } // end-split-event-helpers-async -// start-split-change-event-async +// start-split-change-event-sync var pipeline = new EmptyPipelineDefinition>() .ChangeStreamSplitLargeEvent(); -using (var cursor = await collection.WatchAsync(pipeline)) -{ - while (await cursor.MoveNextAsync()) - { - foreach (var changeStreamEvent in cursor.Current) - { - var completeEvent = await GetNextChangeStreamEvent(cursor); - Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); - } - } -} -// end-split-change-event-async +using var cursor = collection.Watch(pipeline); + foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator())) + { + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); + } +// end-split-change-event-sync -// start-split-change-event-sync +// start-split-change-event-async var pipeline = new EmptyPipelineDefinition>() .ChangeStreamSplitLargeEvent(); -using (var cursor = collection.Watch(pipeline)) -{ - using (var enumerator = cursor.ToEnumerable().GetEnumerator()) - { - while (enumerator.MoveNext()) - { - var completeEvent = GetNextChangeStreamEvent(enumerator); - Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); - } - } -} -// end-split-change-event-sync +using var cursor = await collection.WatchAsync(pipeline); + await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor)) + { + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); + } +// end-split-change-event-async // start-change-stream-post-image var pipeline = new EmptyPipelineDefinition>() From a1666a01dbfd4e9ba76e70b065a01d32559fbfd0 Mon Sep 17 00:00:00 2001 From: norareidy Date: Thu, 26 Dec 2024 14:47:14 -0500 Subject: [PATCH 12/13] edits --- .../crud/read-operations/change-streams.txt | 2 +- .../change-streams/change-streams.cs | 38 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index b7820293..d30f55a2 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -193,7 +193,7 @@ reassemble any event fragments: We recommend reassembling change event fragments, as shown in the preceding example, but this step is optional. You can use the same - logic to watch split events and complete change events. + logic to watch both split and complete change events. The preceding example uses the ``GetNextChangeStreamEvent()``, ``GetNextChangeStreamEventAsync()``, and ``MergeFragment()`` diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index 4fc7f1f7..ff0bb4a0 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -67,25 +67,25 @@ await cursor.ForEachAsync(change => // start-split-event-helpers-sync // Fetches the next complete change stream event - private static IEnumerable> GetNextChangeStreamEvent( - IEnumerator> changeStreamEnumerator) - { - while (changeStreamEnumerator.MoveNext()) - { - var changeStreamEvent = changeStreamEnumerator.Current; - if (changeStreamEvent.SplitEvent != null) - { - var fragment = changeStreamEvent; - while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) - { - changeStreamEnumerator.MoveNext(); - fragment = changeStreamEnumerator.Current; - MergeFragment(changeStreamEvent, fragment); - } - } - yield return changeStreamEvent; - } - } +private static IEnumerable> GetNextChangeStreamEvent( +IEnumerator> changeStreamEnumerator) +{ + while (changeStreamEnumerator.MoveNext()) + { + var changeStreamEvent = changeStreamEnumerator.Current; + if (changeStreamEvent.SplitEvent != null) + { + var fragment = changeStreamEvent; + while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) + { + changeStreamEnumerator.MoveNext(); + fragment = changeStreamEnumerator.Current; + MergeFragment(changeStreamEvent, fragment); + } + } + yield return changeStreamEvent; + } +} // Merges a fragment into the base event private static void MergeFragment( From 9a03ab7e8f1adfec2ff8475e07d45a61f15974bf Mon Sep 17 00:00:00 2001 From: norareidy Date: Fri, 27 Dec 2024 11:07:05 -0500 Subject: [PATCH 13/13] final tech feedback --- .../change-streams/change-streams.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index ff0bb4a0..b71be65a 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -157,10 +157,10 @@ private static void MergeFragment( .ChangeStreamSplitLargeEvent(); using var cursor = collection.Watch(pipeline); - foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator())) - { - Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); - } +foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator())) +{ + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); +} // end-split-change-event-sync // start-split-change-event-async @@ -168,10 +168,10 @@ private static void MergeFragment( .ChangeStreamSplitLargeEvent(); using var cursor = await collection.WatchAsync(pipeline); - await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor)) - { - Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); - } +await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor)) +{ + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); +} // end-split-change-event-async // start-change-stream-post-image