diff --git a/Parse.Tests/EncoderTests.cs b/Parse.Tests/EncoderTests.cs index ea390dc9..544150ae 100644 --- a/Parse.Tests/EncoderTests.cs +++ b/Parse.Tests/EncoderTests.cs @@ -28,6 +28,12 @@ class ParseEncoderTestClass : ParseDataEncoder protected override IDictionary EncodeObject(ParseObject value) => null; } + [TestInitialize] + public void SetUp() + { + Client.Publicize(); + } + [TestMethod] public void TestIsValidType() { diff --git a/Parse.Tests/RelationTests.cs b/Parse.Tests/RelationTests.cs index c550e582..7741198d 100644 --- a/Parse.Tests/RelationTests.cs +++ b/Parse.Tests/RelationTests.cs @@ -114,7 +114,7 @@ public async Task AddRelationToUserAsync_ThrowsException_WhenUserIsNull() public async Task AddRelationToUserAsync_ThrowsException_WhenRelationFieldIsNull() { var user = new ParseUser() { Username = "TestUser", Password = "TestPass", Services = Client.Services }; - await user.SignUpAsync(); + var relatedObjects = new List { new ParseObject("Friend", Client.Services) { ["name"] = "Friend1" } @@ -143,7 +143,6 @@ public async Task UpdateUserRelationAsync_ThrowsException_WhenUserIsNull() public async Task UpdateUserRelationAsync_ThrowsException_WhenRelationFieldIsNull() { var user = new ParseUser() { Username = "TestUser", Password = "TestPass", Services = Client.Services }; - await user.SignUpAsync(); var relatedObjectsToAdd = new List { @@ -168,8 +167,6 @@ public async Task DeleteUserRelationAsync_ThrowsException_WhenUserIsNull() public async Task DeleteUserRelationAsync_ThrowsException_WhenRelationFieldIsNull() { var user = new ParseUser() { Username = "TestUser", Password = "TestPass", Services = Client.Services }; - await user.SignUpAsync(); - await Assert.ThrowsExceptionAsync(() => UserManagement.DeleteUserRelationAsync(user, null)); } [TestMethod] @@ -183,7 +180,6 @@ public async Task GetUserRelationsAsync_ThrowsException_WhenUserIsNull() public async Task GetUserRelationsAsync_ThrowsException_WhenRelationFieldIsNull() { var user = new ParseUser() { Username = "TestUser", Password = "TestPass", Services = Client.Services }; - await user.SignUpAsync(); await Assert.ThrowsExceptionAsync(() => UserManagement.GetUserRelationsAsync(user, null)); } @@ -196,7 +192,6 @@ public async Task AddRelationToUserAsync_ThrowsException_WhenRelatedObjectIsUnsa { // Arrange: Create and sign up a test user. var user = new ParseUser() { Username = "TestUser", Password = "TestPass", Services = Client.Services }; - await user.SignUpAsync(); // Create an unsaved Friend object (do NOT call SaveAsync). var unsavedFriend = new ParseObject("Friend", Client.Services) { ["name"] = "UnsavedFriend" }; @@ -205,6 +200,7 @@ public async Task AddRelationToUserAsync_ThrowsException_WhenRelatedObjectIsUnsa // Act & Assert: Expect an exception when trying to add an unsaved object. await Assert.ThrowsExceptionAsync(() => UserManagement.AddRelationToUserAsync(user, "friends", relatedObjects)); + } diff --git a/Parse.Tests/UserTests.cs b/Parse.Tests/UserTests.cs index 21a8895b..7f7fe65a 100644 --- a/Parse.Tests/UserTests.cs +++ b/Parse.Tests/UserTests.cs @@ -165,7 +165,7 @@ public async Task TestLogOut() // Mock LogOutAsync to ensure it can execute its logic mockCurrentUserController .Setup(obj => obj.LogOutAsync(It.IsAny(), It.IsAny())) - .CallBase(); // Use the actual LogOutAsync implementation + .Returns(Task.CompletedTask); // Mock SessionController for session revocation var mockSessionController = new Mock(); @@ -182,6 +182,7 @@ public async Task TestLogOut() // Inject mocks into ParseClient var client = new ParseClient(new ServerConnectionData { Test = true }, hub); + user.Bind(client); // Act: Perform logout await client.LogOutAsync(CancellationToken.None); diff --git a/Parse/Abstractions/Infrastructure/CustomServiceHub.cs b/Parse/Abstractions/Infrastructure/CustomServiceHub.cs index 554c91bb..416dc807 100644 --- a/Parse/Abstractions/Infrastructure/CustomServiceHub.cs +++ b/Parse/Abstractions/Infrastructure/CustomServiceHub.cs @@ -5,6 +5,7 @@ using Parse.Abstractions.Platform.Configuration; using Parse.Abstractions.Platform.Files; using Parse.Abstractions.Platform.Installations; +using Parse.Abstractions.Platform.LiveQueries; using Parse.Abstractions.Platform.Objects; using Parse.Abstractions.Platform.Push; using Parse.Abstractions.Platform.Queries; @@ -31,6 +32,8 @@ public abstract class CustomServiceHub : ICustomServiceHub public virtual IParseCommandRunner CommandRunner => Services.CommandRunner; + public virtual IWebSocketClient WebSocketClient => Services.WebSocketClient; + public virtual IParseCloudCodeController CloudCodeController => Services.CloudCodeController; public virtual IParseConfigurationController ConfigurationController => Services.ConfigurationController; @@ -41,6 +44,8 @@ public abstract class CustomServiceHub : ICustomServiceHub public virtual IParseQueryController QueryController => Services.QueryController; + public virtual IParseLiveQueryController LiveQueryController => Services.LiveQueryController; + public virtual IParseSessionController SessionController => Services.SessionController; public virtual IParseUserController UserController => Services.UserController; @@ -59,6 +64,8 @@ public abstract class CustomServiceHub : ICustomServiceHub public virtual IServerConnectionData ServerConnectionData => Services.ServerConnectionData; + public virtual ILiveQueryServerConnectionData LiveQueryServerConnectionData => Services.LiveQueryServerConnectionData; + public virtual IParseDataDecoder Decoder => Services.Decoder; public virtual IParseInstallationDataFinalizer InstallationDataFinalizer => Services.InstallationDataFinalizer; diff --git a/Parse/Abstractions/Infrastructure/Execution/IWebSocketClient.cs b/Parse/Abstractions/Infrastructure/Execution/IWebSocketClient.cs new file mode 100644 index 00000000..c5343c59 --- /dev/null +++ b/Parse/Abstractions/Infrastructure/Execution/IWebSocketClient.cs @@ -0,0 +1,71 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Parse.Infrastructure.Execution; + +namespace Parse.Abstractions.Infrastructure.Execution; + +/// +/// Represents an interface for a WebSocket client to handle WebSocket connections and communications. +/// +public interface IWebSocketClient +{ + /// + /// An event that is triggered when a message is received via the WebSocket connection. + /// + /// + /// The event handler receives the message as a string parameter. This can be used to process incoming + /// WebSocket messages, such as notifications, commands, or data updates. + /// + public event EventHandler MessageReceived; + + /// + /// An event that is triggered when an error occurs during the WebSocket operation. + /// + /// + /// This event communicates WebSocket-specific errors along with additional details encapsulated in + /// the object. It can be used to handle and log errors during WebSocket + /// communication or connection lifecycle. + /// + public event EventHandler WebsocketError; + + /// + /// An event that is triggered when an unknown or unexpected error occurs during WebSocket communication. + /// + /// + /// This event can be used to handle errors that do not fall under typical WebSocket error events. The event + /// handler receives an parameter containing details about the error. + /// + public event EventHandler UnknownError; + + /// + /// Establishes a WebSocket connection to the specified server URI. + /// + /// The URI of the WebSocket server to connect to. + /// + /// A token to observe cancellation requests. The operation will stop if the token is canceled. + /// + /// A task that represents the asynchronous operation of opening the WebSocket connection. + public Task OpenAsync(string serverUri, CancellationToken cancellationToken = default); + + /// + /// Closes the active WebSocket connection asynchronously. + /// + /// + /// A token to observe cancellation requests. The operation will stop if the token is canceled. + /// + /// A task that represents the asynchronous operation of closing the WebSocket connection. + public Task CloseAsync(CancellationToken cancellationToken = default); + + /// + /// Sends a message over the established WebSocket connection asynchronously. + /// + /// The message to send through the WebSocket connection. + /// + /// A token to observe cancellation requests. The operation will stop if the token is canceled. + /// + /// A task that represents the asynchronous operation of sending the message. + /// Thrown when trying to send a message on a WebSocket connection that is not in the Open state. + public Task SendAsync(string message, CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/Parse/Abstractions/Infrastructure/ILiveQueryServerConnectionData.cs b/Parse/Abstractions/Infrastructure/ILiveQueryServerConnectionData.cs new file mode 100644 index 00000000..bb2fa6ba --- /dev/null +++ b/Parse/Abstractions/Infrastructure/ILiveQueryServerConnectionData.cs @@ -0,0 +1,25 @@ +namespace Parse.Abstractions.Infrastructure; + +public interface ILiveQueryServerConnectionData : IServerConnectionData +{ + /// + /// Represents the default timeout duration, in milliseconds. + /// + public const int DefaultTimeOut = 5000; // 5 seconds + + /// + /// The timeout duration, in milliseconds, used for various operations, such as + /// establishing a connection or completing a subscription. + /// + int TimeOut { get; set; } + + /// + /// The default buffer size, in bytes. + /// + public const int DefaultBufferSize = 4096; // 4KB + + /// + /// The buffer size, in bytes, used for the WebSocket operations to handle incoming messages. + /// + int MessageBufferSize { get; set; } +} diff --git a/Parse/Abstractions/Infrastructure/IMutableServiceHub.cs b/Parse/Abstractions/Infrastructure/IMutableServiceHub.cs index 99bd78b9..f5fa14cc 100644 --- a/Parse/Abstractions/Infrastructure/IMutableServiceHub.cs +++ b/Parse/Abstractions/Infrastructure/IMutableServiceHub.cs @@ -7,6 +7,7 @@ using Parse.Abstractions.Platform.Configuration; using Parse.Abstractions.Platform.Files; using Parse.Abstractions.Platform.Installations; +using Parse.Abstractions.Platform.LiveQueries; using Parse.Abstractions.Platform.Objects; using Parse.Abstractions.Platform.Push; using Parse.Abstractions.Platform.Queries; @@ -18,6 +19,7 @@ namespace Parse.Abstractions.Infrastructure; public interface IMutableServiceHub : IServiceHub { IServerConnectionData ServerConnectionData { set; } + ILiveQueryServerConnectionData LiveQueryServerConnectionData { set; } IMetadataController MetadataController { set; } IServiceHubCloner Cloner { set; } @@ -30,12 +32,14 @@ public interface IMutableServiceHub : IServiceHub IParseInstallationController InstallationController { set; } IParseCommandRunner CommandRunner { set; } + IWebSocketClient WebSocketClient { set; } IParseCloudCodeController CloudCodeController { set; } IParseConfigurationController ConfigurationController { set; } IParseFileController FileController { set; } IParseObjectController ObjectController { set; } IParseQueryController QueryController { set; } + IParseLiveQueryController LiveQueryController { set; } IParseSessionController SessionController { set; } IParseUserController UserController { set; } IParseCurrentUserController CurrentUserController { set; } diff --git a/Parse/Abstractions/Infrastructure/IServiceHub.cs b/Parse/Abstractions/Infrastructure/IServiceHub.cs index 9614bab3..7bd71aa1 100644 --- a/Parse/Abstractions/Infrastructure/IServiceHub.cs +++ b/Parse/Abstractions/Infrastructure/IServiceHub.cs @@ -7,6 +7,7 @@ using Parse.Abstractions.Platform.Configuration; using Parse.Abstractions.Platform.Files; using Parse.Abstractions.Platform.Installations; +using Parse.Abstractions.Platform.LiveQueries; using Parse.Abstractions.Platform.Objects; using Parse.Abstractions.Platform.Push; using Parse.Abstractions.Platform.Queries; @@ -23,9 +24,10 @@ namespace Parse.Abstractions.Infrastructure; public interface IServiceHub { /// - /// The current server connection data that the the Parse SDK has been initialized with. + /// The current server connection data that the Parse SDK has been initialized with. /// IServerConnectionData ServerConnectionData { get; } + ILiveQueryServerConnectionData LiveQueryServerConnectionData { get; } IMetadataController MetadataController { get; } IServiceHubCloner Cloner { get; } @@ -38,12 +40,14 @@ public interface IServiceHub IParseInstallationController InstallationController { get; } IParseCommandRunner CommandRunner { get; } + IWebSocketClient WebSocketClient { get; } IParseCloudCodeController CloudCodeController { get; } IParseConfigurationController ConfigurationController { get; } IParseFileController FileController { get; } IParseObjectController ObjectController { get; } IParseQueryController QueryController { get; } + IParseLiveQueryController LiveQueryController { get; } IParseSessionController SessionController { get; } IParseUserController UserController { get; } IParseCurrentUserController CurrentUserController { get; } diff --git a/Parse/Abstractions/Platform/LiveQueries/IParseLiveQueryController.cs b/Parse/Abstractions/Platform/LiveQueries/IParseLiveQueryController.cs new file mode 100644 index 00000000..0fbae050 --- /dev/null +++ b/Parse/Abstractions/Platform/LiveQueries/IParseLiveQueryController.cs @@ -0,0 +1,115 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Parse.Platform.LiveQueries; + +namespace Parse.Abstractions.Platform.LiveQueries; + +/// +/// Defines an interface for managing LiveQuery connections, subscriptions, and updates +/// in a Parse Server environment. +/// +public interface IParseLiveQueryController +{ + /// + /// Event triggered when an error occurs during the operation of the ParseLiveQueryController. + /// + /// + /// This event provides details about a live query operation failure, such as specific error messages, + /// error codes, and whether automatic reconnection is recommended. + /// It is raised in scenarios like: + /// - Receiving an error response from the LiveQuery server. + /// - Issues with subscriptions, unsubscriptions, or query updates. + /// Subscribers to this event can use the provided to + /// understand the error and implement appropriate handling mechanisms. + /// + public event EventHandler Error; + + /// + /// Establishes a connection to the live query server asynchronously. + /// + /// + /// A cancellation token that can be used to cancel the connection process. If the token is triggered, + /// the connection process will be terminated. + /// + /// + /// A task that represents the asynchronous connection operation. + /// + /// + /// Thrown when the connection request times out before receiving confirmation from the server. + /// + Task ConnectAsync(CancellationToken cancellationToken = default); + + /// + /// Subscribes to a live query, enabling real-time updates for the specified query object. + /// + /// + /// The type of the ParseObject associated with the live query. + /// + /// + /// The live query instance to subscribe to. It contains details about the query and its parameters. + /// + /// + /// A token to monitor for cancellation requests. It allows the operation to be canceled if requested. + /// + /// + /// An object representing the active subscription for the specified query, enabling interaction with the subscribed events and updates. + /// + /// + /// Thrown when attempting to subscribe while the live query connection is in a closed state. + /// + /// + /// Thrown when the subscription request times out before receiving confirmation from the server. + /// + Task SubscribeAsync(ParseLiveQuery liveQuery, CancellationToken cancellationToken = default) where T : ParseObject; + + /// + /// Updates an active subscription. This method modifies the parameters of an existing subscription for a specific query. + /// + /// + /// The live query object that holds the query parameters to be updated. + /// + /// + /// The unique identifier of the subscription to update. + /// + /// + /// A token to monitor for cancellation requests, allowing the operation to be cancelled before completion. + /// + /// + /// The type of the ParseObject that the query targets. + /// + /// + /// A task that represents the asynchronous operation of updating the subscription. + /// + Task UpdateSubscriptionAsync(ParseLiveQuery liveQuery, int requestId, CancellationToken cancellationToken = default) where T : ParseObject; + + /// + /// Unsubscribes from a live query subscription associated with the given request identifier. + /// + /// + /// The unique identifier of the subscription to unsubscribe from. + /// + /// + /// A cancellation token that can be used to cancel the unsubscription operation before completion. + /// + /// + /// A task that represents the asynchronous unsubscription operation. + /// + /// + /// Thrown if the unsubscription process does not complete within the specified timeout period. + /// + Task UnsubscribeAsync(int requestId, CancellationToken cancellationToken = default); + + /// + /// Closes the live query connection asynchronously. + /// + /// + /// A token to monitor for cancellation requests while closing the live query connection. + /// If the operation is canceled, the task will terminate early. + /// + /// + /// A task that represents the asynchronous operation of closing the live query connection. + /// The task completes when the connection is fully closed and resources are cleaned up. + /// + Task CloseAsync(CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/Parse/Abstractions/Platform/LiveQueries/IParseLiveQuerySubscription.cs b/Parse/Abstractions/Platform/LiveQueries/IParseLiveQuerySubscription.cs new file mode 100644 index 00000000..dfb65293 --- /dev/null +++ b/Parse/Abstractions/Platform/LiveQueries/IParseLiveQuerySubscription.cs @@ -0,0 +1,72 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Parse.Abstractions.Platform.Objects; +using Parse.Platform.LiveQueries; + +namespace Parse.Abstractions.Platform.LiveQueries; + +public interface IParseLiveQuerySubscription +{ + /// + /// Represents the Create event for a live query subscription. + /// This event is triggered when a new object matching the subscription's query is created. + /// + event EventHandler Create; + + /// + /// Represents the Enter event for a live query subscription. + /// This event is triggered when an object that did not previously match the query (and was thus not part of the subscription) + /// starts matching the query, typically due to an update. + /// + event EventHandler Enter; + + /// + /// Represents the Update event for a live query subscription. + /// This event is triggered when an existing object matching the subscription's query is updated. + /// + event EventHandler Update; + + /// + /// Represents the Leave event for a live query subscription. + /// This event is triggered when an object that previously matched the subscription's query + /// no longer matches the criteria and is removed. + /// + event EventHandler Leave; + + /// + /// Represents the Delete event for a live query subscription. + /// This event is triggered when an object matching the subscription's query is deleted. + /// + event EventHandler Delete; + + /// + /// Updates the current live query subscription with new query parameters, + /// effectively modifying the subscription to reflect the provided live query. + /// This allows adjustments to the filter or watched keys without unsubscribing + /// and re-subscribing. + /// + /// The type of the ParseObject associated with the subscription. + /// The updated live query containing new parameters that + /// will replace the existing ones for this subscription. + /// A token to monitor for cancellation requests. If triggered, + /// the update process will be halted. + /// A task that represents the asynchronous operation of updating + /// the subscription with the new query parameters. + Task UpdateAsync(ParseLiveQuery liveQuery, CancellationToken cancellationToken = default) where T : ParseObject; + + /// + /// Cancels the current live query subscription by unsubscribing from the Parse Live Query server. + /// This ensures that the client will no longer receive real-time updates or notifications + /// associated with this subscription. + /// + /// A token to monitor for cancellation requests. If triggered, the cancellation process will halt. + /// A task that represents the asynchronous operation of canceling the subscription. + Task CancelAsync(CancellationToken cancellationToken = default); + + internal void OnCreate(IObjectState objectState); + internal void OnEnter(IObjectState objectState, IObjectState originalState); + internal void OnUpdate(IObjectState objectState, IObjectState originalState); + internal void OnLeave(IObjectState objectState, IObjectState originalState); + internal void OnDelete(IObjectState objectState); +} \ No newline at end of file diff --git a/Parse/Infrastructure/Execution/MessageReceivedEventArgs.cs b/Parse/Infrastructure/Execution/MessageReceivedEventArgs.cs new file mode 100644 index 00000000..f4342cfc --- /dev/null +++ b/Parse/Infrastructure/Execution/MessageReceivedEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace Parse.Infrastructure.Execution; + +/// +/// Provides data for the event that is triggered when a message is received. +/// +public class MessageReceivedEventArgs(string message) : EventArgs +{ + /// + /// Gets the message content that was received. + /// + public string Message { get; } = message; +} \ No newline at end of file diff --git a/Parse/Infrastructure/Execution/TextWebSocketClient.cs b/Parse/Infrastructure/Execution/TextWebSocketClient.cs new file mode 100644 index 00000000..0b780f36 --- /dev/null +++ b/Parse/Infrastructure/Execution/TextWebSocketClient.cs @@ -0,0 +1,209 @@ +using System; +using System.Diagnostics; +using System.IO; +using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Parse.Abstractions.Infrastructure.Execution; + +namespace Parse.Infrastructure.Execution; + +/// +/// Represents a WebSocket client that allows connecting to a WebSocket server, sending messages, and receiving messages. +/// Implements the IWebSocketClient interface for WebSocket operations. +/// +class TextWebSocketClient(int bufferSize) : IWebSocketClient +{ + /// + /// A private instance of the ClientWebSocket class used to manage the WebSocket connection. + /// This variable is responsible for handling the low-level WebSocket communication, including + /// connecting, sending, and receiving data from the WebSocket server. It is initialized + /// when establishing a connection and is used internally for operations such as sending messages + /// and listening for incoming data. + /// + private ClientWebSocket webSocket; + + /// + /// A private instance of the Task class representing the background operation + /// responsible for continuously listening for incoming WebSocket messages. + /// This task is used to manage the asynchronous listening process, ensuring that + /// messages are received from the WebSocket server without blocking the main thread. + /// It is initialized when the listening process starts and monitored to prevent + /// multiple concurrent listeners from being created. + /// + private Task listeningTask; + + /// + /// An event triggered whenever a message is received from the WebSocket server. + /// This event is used to notify subscribers with the content of the received message, + /// represented as a string. Handlers for this event can process or respond to the message + /// based on the application's requirements. + /// + public event EventHandler MessageReceived; + public event EventHandler WebsocketError; + public event EventHandler UnknownError; + + private readonly object connectionLock = new object(); + + private int BufferSize { get; } = bufferSize; + + /// + /// Opens a WebSocket connection to the specified server URI and starts listening for messages. + /// If the connection is already open or in a connecting state, this method does nothing. + /// + /// The URI of the WebSocket server to connect to. + /// A cancellation token that can be used to cancel the connect operation. + /// + /// A task representing the asynchronous operation of connecting to the WebSocket server. + /// + public async Task OpenAsync(string serverUri, CancellationToken cancellationToken = default) + { + ClientWebSocket webSocketToConnect = null; + lock (connectionLock) + { + webSocket ??= new ClientWebSocket(); + if (webSocket.State != WebSocketState.Open && webSocket.State != WebSocketState.Connecting) + { + webSocketToConnect = webSocket; + } + } + + if (webSocketToConnect is not null) + { + await webSocketToConnect.ConnectAsync(new Uri(serverUri), cancellationToken); + StartListening(cancellationToken); + } + } + + /// + /// Closes the WebSocket connection gracefully with a normal closure status. + /// Ensures that the WebSocket connection is properly terminated and resources are released. + /// + /// A cancellation token that can be used to cancel the close operation. + /// + /// A task representing the asynchronous operation of closing the WebSocket connection. + /// + public async Task CloseAsync(CancellationToken cancellationToken = default) + { + if (webSocket is not null) + { + await webSocket?.CloseAsync(WebSocketCloseStatus.NormalClosure, String.Empty, cancellationToken)!; + } + } + + private async Task ListenForMessages(CancellationToken cancellationToken) + { + byte[] buffer = new byte[BufferSize]; + + try + { + while (!cancellationToken.IsCancellationRequested && + webSocket.State == WebSocketState.Open) + { + WebSocketReceiveResult result = await webSocket.ReceiveAsync( + new ArraySegment(buffer), + cancellationToken); + + if (result.MessageType == WebSocketMessageType.Close) + { + await CloseAsync(cancellationToken); + break; + } + + if (result.EndOfMessage) + { + string message = Encoding.UTF8.GetString(buffer, 0, result.Count); + MessageReceived?.Invoke(this, new MessageReceivedEventArgs(message)); + } + else + { + // Handle partial messages by accumulating data until EndOfMessage is true + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count)); + while (!result.EndOfMessage) + { + result = await webSocket.ReceiveAsync( + new ArraySegment(buffer), + cancellationToken); + messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count)); + } + string fullMessage = messageBuilder.ToString(); + MessageReceived?.Invoke(this, new MessageReceivedEventArgs(fullMessage)); + } + } + } + catch (OperationCanceledException ex) + { + // Normal cancellation, no need to handle + Debug.WriteLine($"Websocket connection was closed: {ex.Message}"); + } + catch (WebSocketException ex) + { + // WebSocket error, notify the user + Debug.WriteLine($"Websocket error ({ex.ErrorCode}): {ex.Message}"); + WebsocketError?.Invoke(this, new ErrorEventArgs(ex)); + } + catch (Exception ex) + { + // Unexpected error, notify the user + Debug.WriteLine($"Unexpected error in Websocket listener: {ex.Message}"); + UnknownError?.Invoke(this, new ErrorEventArgs(ex)); + } + Debug.WriteLine("Websocket ListenForMessage stopped"); + } + + /// + /// Starts listening for incoming messages from the WebSocket connection. This method ensures that only one listener task is running at a time. + /// + /// A cancellation token to signal the listener task to stop. + private void StartListening(CancellationToken cancellationToken) + { + // Make sure we don't start multiple listeners + if (listeningTask is { IsCompleted: false }) + { + return; + } + + // Start the listener task + listeningTask = Task.Run(async () => + { + if (cancellationToken.IsCancellationRequested) + { + cancellationToken.ThrowIfCancellationRequested(); + } + + await ListenForMessages(cancellationToken); + Debug.WriteLine("Websocket listeningTask stopped"); + }, cancellationToken); + + _ = listeningTask.ContinueWith(task => + { + if (!task.IsFaulted) + return; + Debug.WriteLine($"Websocket listener task faulted: {task.Exception}"); + }, TaskContinuationOptions.OnlyOnFaulted); + } + + /// + /// Sends a text message to the connected WebSocket server asynchronously. + /// The message is encoded in UTF-8 format before being sent. + /// + /// The message to be sent to the WebSocket server. + /// A cancellation token that can be used to cancel the send operation. + /// + /// A task representing the asynchronous operation of sending the message to the WebSocket server. + /// + /// Thrown when the WebSocket instance is null. + /// Thrown when there is an error during the WebSocket communication. + /// Thrown when trying to send a message on a WebSocket connection that is not in the Open state. + public async Task SendAsync(string message, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(webSocket); + if (webSocket.State != WebSocketState.Open) + { + throw new InvalidOperationException($"WebSocket is not in Open state. Current state: {webSocket.State}"); + } + await webSocket.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, cancellationToken); + } +} \ No newline at end of file diff --git a/Parse/Infrastructure/LateInitializedMutableServiceHub.cs b/Parse/Infrastructure/LateInitializedMutableServiceHub.cs index b5c671f4..aaa7c6f3 100644 --- a/Parse/Infrastructure/LateInitializedMutableServiceHub.cs +++ b/Parse/Infrastructure/LateInitializedMutableServiceHub.cs @@ -12,6 +12,7 @@ using Parse.Abstractions.Platform.Sessions; using Parse.Abstractions.Platform.Users; using Parse.Abstractions.Platform.Analytics; +using Parse.Abstractions.Platform.LiveQueries; using Parse.Infrastructure.Execution; using Parse.Platform.Objects; using Parse.Platform.Installations; @@ -25,6 +26,7 @@ using Parse.Platform.Push; using Parse.Infrastructure.Data; using Parse.Infrastructure.Utilities; +using Parse.Platform.LiveQueries; namespace Parse.Infrastructure; @@ -160,5 +162,19 @@ public IParseInstallationDataFinalizer InstallationDataFinalizer set => LateInitializer.SetValue(value); } + + public IWebSocketClient WebSocketClient + { + get => LateInitializer.GetValue(() => LiveQueryServerConnectionData is null ? null : new TextWebSocketClient(LiveQueryServerConnectionData.MessageBufferSize)); + set => LateInitializer.SetValue(value); + } + + public IParseLiveQueryController LiveQueryController + { + get => LateInitializer.GetValue(() => LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.TimeOut, WebSocketClient, Decoder)); + set => LateInitializer.SetValue(value); + } + public IServerConnectionData ServerConnectionData { get; set; } + public ILiveQueryServerConnectionData LiveQueryServerConnectionData { get; set; } } diff --git a/Parse/Infrastructure/LiveQueryServerConnectionData.cs b/Parse/Infrastructure/LiveQueryServerConnectionData.cs new file mode 100644 index 00000000..9c8dfa52 --- /dev/null +++ b/Parse/Infrastructure/LiveQueryServerConnectionData.cs @@ -0,0 +1,50 @@ +using System.Collections.Generic; +using Parse.Abstractions.Infrastructure; + +namespace Parse.Infrastructure; + +/// +/// Represents the configuration of the Parse Live Query server. +/// +public struct LiveQueryServerConnectionData : ILiveQueryServerConnectionData +{ + public LiveQueryServerConnectionData() { } + + internal bool Test { get; set; } + + /// + /// The timeout duration, in milliseconds, used for various operations, such as + /// establishing a connection or completing a subscription. + /// + public int TimeOut { get; set; } = ILiveQueryServerConnectionData.DefaultTimeOut; + + /// + /// The buffer size, in bytes, used by the WebSocket client for communication operations. + /// + public int MessageBufferSize { get; set; } = ILiveQueryServerConnectionData.DefaultBufferSize; + + /// + /// The App ID of your app. + /// + public string ApplicationID { get; set; } + + /// + /// A URI pointing to the target Parse Server instance hosting the app targeted by . + /// + public string ServerURI { get; set; } + + /// + /// The .NET Key for the Parse app targeted by . + /// + public string Key { get; set; } + + /// + /// The Master Key for the Parse app targeted by . + /// + public string MasterKey { get; set; } + + /// + /// Additional HTTP headers to be sent with network requests from the SDK. + /// + public IDictionary Headers { get; set; } +} diff --git a/Parse/Infrastructure/MutableServiceHub.cs b/Parse/Infrastructure/MutableServiceHub.cs index 3cf50a0d..4585f813 100644 --- a/Parse/Infrastructure/MutableServiceHub.cs +++ b/Parse/Infrastructure/MutableServiceHub.cs @@ -7,6 +7,7 @@ using Parse.Abstractions.Platform.Configuration; using Parse.Abstractions.Platform.Files; using Parse.Abstractions.Platform.Installations; +using Parse.Abstractions.Platform.LiveQueries; using Parse.Abstractions.Platform.Objects; using Parse.Abstractions.Platform.Push; using Parse.Abstractions.Platform.Queries; @@ -19,6 +20,7 @@ using Parse.Platform.Configuration; using Parse.Platform.Files; using Parse.Platform.Installations; +using Parse.Platform.LiveQueries; using Parse.Platform.Objects; using Parse.Platform.Push; using Parse.Platform.Queries; @@ -34,6 +36,7 @@ namespace Parse.Infrastructure; public class MutableServiceHub : IMutableServiceHub { public IServerConnectionData ServerConnectionData { get; set; } + public ILiveQueryServerConnectionData LiveQueryServerConnectionData { get; set; } public IMetadataController MetadataController { get; set; } public IServiceHubCloner Cloner { get; set; } @@ -46,12 +49,14 @@ public class MutableServiceHub : IMutableServiceHub public IParseInstallationController InstallationController { get; set; } public IParseCommandRunner CommandRunner { get; set; } + public IWebSocketClient WebSocketClient { get; set; } public IParseCloudCodeController CloudCodeController { get; set; } public IParseConfigurationController ConfigurationController { get; set; } public IParseFileController FileController { get; set; } public IParseObjectController ObjectController { get; set; } public IParseQueryController QueryController { get; set; } + public IParseLiveQueryController LiveQueryController { get; set; } public IParseSessionController SessionController { get; set; } public IParseUserController UserController { get; set; } public IParseCurrentUserController CurrentUserController { get; set; } @@ -65,9 +70,10 @@ public class MutableServiceHub : IMutableServiceHub public IParseCurrentInstallationController CurrentInstallationController { get; set; } public IParseInstallationDataFinalizer InstallationDataFinalizer { get; set; } - public MutableServiceHub SetDefaults(IServerConnectionData connectionData = default) + public MutableServiceHub SetDefaults(IServerConnectionData connectionData = default, ILiveQueryServerConnectionData liveQueryConnectionData = default) { ServerConnectionData ??= connectionData; + LiveQueryServerConnectionData ??= liveQueryConnectionData; MetadataController ??= new MetadataController { EnvironmentData = EnvironmentData.Inferred, @@ -103,6 +109,9 @@ public MutableServiceHub SetDefaults(IServerConnectionData connectionData = defa PushChannelsController ??= new ParsePushChannelsController(CurrentInstallationController); InstallationDataFinalizer ??= new ParseInstallationDataFinalizer { }; + WebSocketClient ??= LiveQueryServerConnectionData is null ? null : new TextWebSocketClient(LiveQueryServerConnectionData.MessageBufferSize); + LiveQueryController ??= LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.TimeOut, WebSocketClient, Decoder); + return this; } } diff --git a/Parse/Infrastructure/OrchestrationServiceHub.cs b/Parse/Infrastructure/OrchestrationServiceHub.cs index d8079425..2517ecdf 100644 --- a/Parse/Infrastructure/OrchestrationServiceHub.cs +++ b/Parse/Infrastructure/OrchestrationServiceHub.cs @@ -6,6 +6,7 @@ using Parse.Abstractions.Platform.Configuration; using Parse.Abstractions.Platform.Files; using Parse.Abstractions.Platform.Installations; +using Parse.Abstractions.Platform.LiveQueries; using Parse.Abstractions.Platform.Objects; using Parse.Abstractions.Platform.Push; using Parse.Abstractions.Platform.Queries; @@ -33,6 +34,7 @@ public class OrchestrationServiceHub : IServiceHub public IParseInstallationController InstallationController => Custom.InstallationController ?? Default.InstallationController; public IParseCommandRunner CommandRunner => Custom.CommandRunner ?? Default.CommandRunner; + public IWebSocketClient WebSocketClient => Custom.WebSocketClient ?? Default.WebSocketClient; public IParseCloudCodeController CloudCodeController => Custom.CloudCodeController ?? Default.CloudCodeController; @@ -44,6 +46,8 @@ public class OrchestrationServiceHub : IServiceHub public IParseQueryController QueryController => Custom.QueryController ?? Default.QueryController; + public IParseLiveQueryController LiveQueryController => Custom.LiveQueryController ?? Default.LiveQueryController; + public IParseSessionController SessionController => Custom.SessionController ?? Default.SessionController; public IParseUserController UserController => Custom.UserController ?? Default.UserController; @@ -62,6 +66,8 @@ public class OrchestrationServiceHub : IServiceHub public IServerConnectionData ServerConnectionData => Custom.ServerConnectionData ?? Default.ServerConnectionData; + public ILiveQueryServerConnectionData LiveQueryServerConnectionData => Custom.LiveQueryServerConnectionData ?? Default.LiveQueryServerConnectionData; + public IParseDataDecoder Decoder => Custom.Decoder ?? Default.Decoder; public IParseInstallationDataFinalizer InstallationDataFinalizer => Custom.InstallationDataFinalizer ?? Default.InstallationDataFinalizer; diff --git a/Parse/Infrastructure/ServiceHub.cs b/Parse/Infrastructure/ServiceHub.cs index dbff4b24..6beb0591 100644 --- a/Parse/Infrastructure/ServiceHub.cs +++ b/Parse/Infrastructure/ServiceHub.cs @@ -7,6 +7,7 @@ using Parse.Abstractions.Platform.Configuration; using Parse.Abstractions.Platform.Files; using Parse.Abstractions.Platform.Installations; +using Parse.Abstractions.Platform.LiveQueries; using Parse.Abstractions.Platform.Objects; using Parse.Abstractions.Platform.Push; using Parse.Abstractions.Platform.Queries; @@ -20,6 +21,7 @@ using Parse.Platform.Configuration; using Parse.Platform.Files; using Parse.Platform.Installations; +using Parse.Platform.LiveQueries; using Parse.Platform.Objects; using Parse.Platform.Push; using Parse.Platform.Queries; @@ -37,6 +39,7 @@ public class ServiceHub : IServiceHub LateInitializer LateInitializer { get; } = new LateInitializer { }; public IServerConnectionData ServerConnectionData { get; set; } + public ILiveQueryServerConnectionData LiveQueryServerConnectionData { get; set; } public IMetadataController MetadataController => LateInitializer.GetValue(() => new MetadataController { HostManifestData = HostManifestData.Inferred, EnvironmentData = EnvironmentData.Inferred }); public IServiceHubCloner Cloner => LateInitializer.GetValue(() => new { } as object as IServiceHubCloner); @@ -68,6 +71,9 @@ public class ServiceHub : IServiceHub public IParseCurrentInstallationController CurrentInstallationController => LateInitializer.GetValue(() => new ParseCurrentInstallationController(InstallationController, CacheController, InstallationCoder, ClassController)); public IParseInstallationDataFinalizer InstallationDataFinalizer => LateInitializer.GetValue(() => new ParseInstallationDataFinalizer { }); + public IWebSocketClient WebSocketClient => LateInitializer.GetValue(() => LiveQueryServerConnectionData is null ? null : new TextWebSocketClient(LiveQueryServerConnectionData.MessageBufferSize)); + public IParseLiveQueryController LiveQueryController => LateInitializer.GetValue(() => LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.TimeOut, WebSocketClient, Decoder)); + public bool Reset() { return LateInitializer.Used && LateInitializer.Reset(); diff --git a/Parse/Platform/LiveQueries/ParseLiveQuery.cs b/Parse/Platform/LiveQueries/ParseLiveQuery.cs new file mode 100644 index 00000000..14785342 --- /dev/null +++ b/Parse/Platform/LiveQueries/ParseLiveQuery.cs @@ -0,0 +1,112 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Parse.Abstractions.Infrastructure; +using Parse.Abstractions.Platform.LiveQueries; + +namespace Parse; + +/// +/// The ParseLiveQuery class provides functionality to create and manage real-time queries on the Parse Server. +/// It allows tracking changes on objects of a specified class that match query constraints, such as filters +/// and watched fields, delivering updates in real-time as changes occur. +/// +/// Represents the type of ParseObject that this query operates on. T must inherit from ParseObject. +public class ParseLiveQuery where T : ParseObject +{ + /// + /// Serialized clauses. + /// + IDictionary Filters { get; } + + /// + /// Serialized key selections. + /// + ReadOnlyCollection KeySelections { get; } + + /// + /// Serialized keys watched. + /// + ReadOnlyCollection KeyWatchers { get; } + + internal string ClassName { get; } + + internal IServiceHub Services { get; } + + internal ParseLiveQuery(IServiceHub serviceHub, string className, IDictionary filters, IEnumerable selectedKeys = null, IEnumerable watchedKeys = null) + { + ArgumentNullException.ThrowIfNull(serviceHub); + ArgumentException.ThrowIfNullOrWhiteSpace(className); + ArgumentNullException.ThrowIfNull(filters); + + Services = serviceHub; + ClassName = className; + Filters = filters; + + if (selectedKeys is not null) + { + KeySelections = new ReadOnlyCollection(selectedKeys.ToList()); + } + + if (watchedKeys is not null) + { + KeyWatchers = new ReadOnlyCollection(watchedKeys.ToList()); + } + } + + /// + /// Private constructor for composition of queries. A source query is required, + /// but the remaining values can be null if they aren't changed in this + /// composition. + /// + private ParseLiveQuery(ParseLiveQuery source, IEnumerable watchedKeys = null) + { + ArgumentNullException.ThrowIfNull(source); + + Services = source.Services; + ClassName = source.ClassName; + Filters = source.Filters; + KeySelections = source.KeySelections; + KeyWatchers = source.KeyWatchers; + + if (watchedKeys is not null) + { + KeyWatchers = new ReadOnlyCollection(MergeWatchers(watchedKeys).ToList()); + } + } + + private HashSet MergeWatchers(IEnumerable keys) => [..(KeyWatchers ?? Enumerable.Empty()).Concat(keys)]; + + /// + /// Add the provided key to the watched fields of returned ParseObjects. + /// If this is called multiple times, then all the keys specified in each of + /// the calls will be watched. + /// + /// The key that should be watched. + /// A new query with the additional constraint. + public ParseLiveQuery Watch(string watch) => new(this, new List { watch }); + + internal IDictionary BuildParameters() + { + Dictionary result = new Dictionary { ["className"] = ClassName, ["where"] = Filters }; + if (KeySelections != null) + result["keys"] = KeySelections.ToArray(); + if (KeyWatchers != null) + result["watch"] = KeyWatchers.ToArray(); + return result; + } + + /// + /// Subscribes to the live query, allowing the client to receive real-time updates + /// for the query's results. This establishes a subscription with the Live Query service. + /// + /// + /// A task representing the asynchronous subscription operation. Upon completion + /// of the task, the subscription is successfully registered. + /// + public async Task SubscribeAsync(CancellationToken cancellationToken = default) => + await Services.LiveQueryController.SubscribeAsync(this, cancellationToken); +} \ No newline at end of file diff --git a/Parse/Platform/LiveQueries/ParseLiveQueryController.cs b/Parse/Platform/LiveQueries/ParseLiveQueryController.cs new file mode 100644 index 00000000..cc9e2817 --- /dev/null +++ b/Parse/Platform/LiveQueries/ParseLiveQueryController.cs @@ -0,0 +1,652 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using Parse.Abstractions.Infrastructure.Data; +using Parse.Abstractions.Infrastructure.Execution; +using Parse.Abstractions.Platform.LiveQueries; +using Parse.Infrastructure.Data; +using Parse.Infrastructure.Execution; +using Parse.Infrastructure.Utilities; + +namespace Parse.Platform.LiveQueries; + +/// +/// The ParseLiveQueryController is responsible for managing live query subscriptions, maintaining a connection +/// to the Parse LiveQuery server, and handling real-time updates from the server. +/// +public class ParseLiveQueryController : IParseLiveQueryController, IDisposable, IAsyncDisposable +{ + private IParseDataDecoder Decoder { get; } + private IWebSocketClient WebSocketClient { get; } + + private int LastRequestId; + + private string ClientId { get; set; } + + private bool disposed; + + /// + /// Gets or sets the timeout duration, in milliseconds, used by the ParseLiveQueryController + /// for various operations, such as establishing a connection or completing a subscription. + /// + /// + /// This property determines the maximum amount of time the controller will wait for an operation + /// to complete before throwing a . It is used in operations such as + /// - Connecting to the LiveQuery server. + /// - Subscribing to a query. + /// - Unsubscribing from a query. + /// Ensure that the value is configured appropriately to avoid premature timeout errors in network-dependent processes. + /// + private int TimeOut { get; } + + /// + /// Event triggered when an error occurs during the operation of the ParseLiveQueryController. + /// + /// + /// This event provides details about a live query operation failure, such as specific error messages, + /// error codes, and whether automatic reconnection is recommended. + /// It is raised in scenarios like: + /// - Receiving an error response from the LiveQuery server. + /// - Issues with subscriptions, unsubscriptions, or query updates. + /// Subscribers to this event can use the provided to + /// understand the error and implement appropriate handling mechanisms. + /// + public event EventHandler Error; + + /// + /// Represents the state of a connection to the Parse LiveQuery server, indicating whether the connection is closed, + /// in the process of connecting, or fully established. + /// + public enum ParseLiveQueryState + { + /// + /// Represents the state where the live query connection is closed. + /// This indicates that any active connection to the live query server + /// has been terminated, and no data updates are being received. + /// + Closed, + + /// + /// Represents the state where the live query connection is in the process of being established. + /// This indicates that the client is actively attempting to connect to the live query server, + /// but the connection has not yet been fully established. + /// + Connecting, + + /// + /// Represents the state where the live query connection has been successfully established. + /// This state indicates that the client is actively connected to the Parse LiveQuery server + /// and is receiving real-time data updates. + /// + Connected + } + + /// + /// Gets the current state of the ParseLiveQueryController. This property indicates + /// whether the controller is in a Closed, Connecting, or Connected state. + /// + /// + /// - `Closed`: Indicates that the controller is not connected. + /// - `Connecting`: Indicates that a connection attempt is in progress. + /// - `Connected`: Indicates that the controller is actively connected. + /// This property is updated based on the controller's connection lifecycle events, + /// such as when a connection is established or closed, or when an error occurs. + /// + public ParseLiveQueryState State => _state; + private volatile ParseLiveQueryState _state; + + TaskCompletionSource ConnectionSignal { get; set; } + private ConcurrentDictionary SubscriptionSignals { get; } = new ConcurrentDictionary(); + private ConcurrentDictionary UnsubscriptionSignals { get; } = new ConcurrentDictionary(); + private ConcurrentDictionary Subscriptions { get; set; } = new ConcurrentDictionary(); + + + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// The implementation to use for the live query connection. + /// + /// + /// + /// This constructor is used to initialize a new instance of the class + /// + public ParseLiveQueryController(int timeOut, IWebSocketClient webSocketClient, IParseDataDecoder decoder) + { + WebSocketClient = webSocketClient ?? throw new ArgumentNullException(nameof(webSocketClient)); + Decoder = decoder ?? throw new ArgumentNullException(nameof(decoder)); + TimeOut = timeOut; + _state = ParseLiveQueryState.Closed; + } + + private void ProcessMessage(IDictionary message) + { + if (!message.TryGetValue("op", out object opValue) || opValue is not string op) + { + Debug.WriteLine("Missing or invalid operation in message"); + return; + } + + switch (op) + { + // CONNECTION + case "connected": + ProcessConnectionMessage(message); + break; + case "subscribed": // Response from subscription and subscription update + ProcessSubscriptionMessage(message); + break; + case "unsubscribed": + ProcessUnsubscriptionMessage(message); + break; + case "error": + ProcessErrorMessage(message); + break; + case "create": + ProcessCreateEventMessage(message); + break; + case "enter": + ProcessEnterEventMessage(message); + break; + case "update": + ProcessUpdateEventMessage(message); + break; + case "leave": + ProcessLeaveEventMessage(message); + break; + case "delete": + ProcessDeleteEventMessage(message); + break; + default: + Debug.WriteLine($"Unknown operation: {message["op"]}"); + break; + } + } + + private bool ValidateClientMessage(IDictionary message, out int requestId) + { + requestId = 0; + + if (!(message.TryGetValue("clientId", out object clientIdObj) && + clientIdObj is string clientId && clientId == ClientId)) + return false; + + return message.TryGetValue("requestId", out object requestIdObj) && + Int32.TryParse(requestIdObj?.ToString(), out requestId); + } + + private bool GetDictEntry(IDictionary message, string key, out IDictionary objDict) + { + if (message.TryGetValue(key, out object obj) && + obj is IDictionary dict) + { + objDict = dict; + return true; + } + + objDict = null; + return false; + } + + void ProcessDeleteEventMessage(IDictionary message) + { + if (!ValidateClientMessage(message, out int requestId)) + return; + + if (!GetDictEntry(message, "object", out IDictionary objectDict)) + return; + + if (!Subscriptions.TryGetValue(requestId, out IParseLiveQuerySubscription subscription)) + return; + + subscription.OnDelete(ParseObjectCoder.Instance.Decode(objectDict, Decoder, ParseClient.Instance.Services)); + } + + void ProcessLeaveEventMessage(IDictionary message) + { + if (!ValidateClientMessage(message, out int requestId)) + return; + + if (!GetDictEntry(message, "object", out IDictionary objectDict)) + return; + + if (!GetDictEntry(message, "original", out IDictionary originalDict)) + return; + + if (!Subscriptions.TryGetValue(requestId, out IParseLiveQuerySubscription subscription)) + return; + + subscription.OnLeave( + ParseObjectCoder.Instance.Decode(objectDict, Decoder, ParseClient.Instance.Services), + ParseObjectCoder.Instance.Decode(originalDict, Decoder, ParseClient.Instance.Services)); + } + + void ProcessUpdateEventMessage(IDictionary message) + { + if (!ValidateClientMessage(message, out int requestId)) + return; + + if (!GetDictEntry(message, "object", out IDictionary objectDict)) + return; + + if (!GetDictEntry(message, "original", out IDictionary originalDict)) + return; + + if (!Subscriptions.TryGetValue(requestId, out IParseLiveQuerySubscription subscription)) + return; + + subscription.OnUpdate( + ParseObjectCoder.Instance.Decode(objectDict, Decoder, ParseClient.Instance.Services), + ParseObjectCoder.Instance.Decode(originalDict, Decoder, ParseClient.Instance.Services)); + } + + void ProcessEnterEventMessage(IDictionary message) + { + if (!ValidateClientMessage(message, out int requestId)) + return; + + if (!GetDictEntry(message, "object", out IDictionary objectDict)) + return; + + if (!GetDictEntry(message, "original", out IDictionary originalDict)) + return; + + if (!Subscriptions.TryGetValue(requestId, out IParseLiveQuerySubscription subscription)) + return; + + subscription.OnEnter( + ParseObjectCoder.Instance.Decode(objectDict, Decoder, ParseClient.Instance.Services), + ParseObjectCoder.Instance.Decode(originalDict, Decoder, ParseClient.Instance.Services)); + } + + void ProcessCreateEventMessage(IDictionary message) + { + if (!ValidateClientMessage(message, out int requestId)) + return; + + if (!GetDictEntry(message, "object", out IDictionary objectDict)) + return; + + if (!Subscriptions.TryGetValue(requestId, out IParseLiveQuerySubscription subscription)) + return; + + subscription.OnCreate(ParseObjectCoder.Instance.Decode(objectDict, Decoder, ParseClient.Instance.Services)); + } + + void ProcessErrorMessage(IDictionary message) + { + if (!(message.TryGetValue("code", out object codeObj) && + Int32.TryParse(codeObj?.ToString(), out int code))) + return; + + if (!(message.TryGetValue("error", out object errorObj) && + errorObj is string error)) + return; + + if (!(message.TryGetValue("reconnect", out object reconnectObj) && + Boolean.TryParse(reconnectObj?.ToString(), out bool reconnect))) + return; + + Error?.Invoke(this, new ParseLiveQueryErrorEventArgs(code, error, reconnect)); + } + + void ProcessUnsubscriptionMessage(IDictionary message) + { + if (!ValidateClientMessage(message, out int requestId)) + return; + + if (UnsubscriptionSignals.TryGetValue(requestId, out TaskCompletionSource unsubscriptionSign)) + { + unsubscriptionSign?.TrySetResult(); + } + } + + void ProcessSubscriptionMessage(IDictionary message) + { + if (!ValidateClientMessage(message, out int requestId)) + return; + + if (SubscriptionSignals.TryGetValue(requestId, out TaskCompletionSource subscriptionSignal)) + { + subscriptionSignal?.TrySetResult(); + } + } + + void ProcessConnectionMessage(IDictionary message) + { + if (!(message.TryGetValue("clientId", out object clientIdObj) && + clientIdObj is string clientId)) + return; + + ClientId = clientId; + _state = ParseLiveQueryState.Connected; + ConnectionSignal?.TrySetResult(); + } + + private async Task> AppendSessionToken(IDictionary message) + { + string sessionToken = await ParseClient.Instance.Services.GetCurrentSessionToken(); + return sessionToken is null + ? message + : message.Concat(new Dictionary { + { "sessionToken", sessionToken } + }).ToDictionary(); + } + + private async Task SendMessage(IDictionary message, CancellationToken cancellationToken) => + await WebSocketClient.SendAsync(JsonUtilities.Encode(message), cancellationToken); + + private async Task OpenAsync(CancellationToken cancellationToken = default) + { + if (ParseClient.Instance.Services == null) + { + throw new InvalidOperationException("ParseClient.Services must be initialized before connecting to the LiveQuery server."); + } + + if (ParseClient.Instance.Services.LiveQueryServerConnectionData == null) + { + throw new InvalidOperationException("ParseClient.Services.LiveQueryServerConnectionData must be initialized before connecting to the LiveQuery server."); + } + + await WebSocketClient.OpenAsync(ParseClient.Instance.Services.LiveQueryServerConnectionData.ServerURI, cancellationToken); + } + + private void WebSocketClientOnMessageReceived(object sender, MessageReceivedEventArgs args) + { + object parsed = JsonUtilities.Parse(args.Message); + if (parsed is IDictionary message) + { + ProcessMessage(message); + } + else + { + Debug.WriteLine($"Invalid message format received: {args.Message}"); + } + } + + /// + /// Establishes a connection to the live query server asynchronously. + /// + /// + /// A cancellation token used to propagate notification that the operation should be canceled. + /// + /// + /// A task that represents the asynchronous operation. + /// + /// + /// Thrown if the live query server connection request exceeds the defined timeout. + /// + public async Task ConnectAsync(CancellationToken cancellationToken = default) + { + if (_state == ParseLiveQueryState.Closed) + { + _state = ParseLiveQueryState.Connecting; + await OpenAsync(cancellationToken); + WebSocketClient.MessageReceived += WebSocketClientOnMessageReceived; + WebSocketClient.WebsocketError += WebSocketClientOnWebsocketError; + WebSocketClient.UnknownError += WebSocketClientOnUnknownError; + Dictionary message = new Dictionary + { + { "op", "connect" }, + { "applicationId", ParseClient.Instance.Services.LiveQueryServerConnectionData.ApplicationID }, + { "windowsKey", ParseClient.Instance.Services.LiveQueryServerConnectionData.Key } + }; + + ConnectionSignal = new TaskCompletionSource(); + try + { + await SendMessage(await AppendSessionToken(message), cancellationToken); + + using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(TimeOut); + + await ConnectionSignal.Task.WaitAsync(cts.Token); + _state = ParseLiveQueryState.Connected; + } + catch (OperationCanceledException) + { + throw new TimeoutException("Live query server connection request has reached timeout"); + } + finally + { + ConnectionSignal = null; + } + } + else if (_state == ParseLiveQueryState.Connecting) + { + TaskCompletionSource signal = ConnectionSignal; + if (signal is not null) + { + await signal.Task.WaitAsync(cancellationToken); + } + } + } + + void WebSocketClientOnWebsocketError(object sender, ErrorEventArgs args) + { + if (args.GetException() is WebSocketException ex) + { + Error?.Invoke(this, new ParseLiveQueryErrorEventArgs(ex.ErrorCode, ex.Message, false, ex)); + } + } + + void WebSocketClientOnUnknownError(object sender, ErrorEventArgs args) + { + if (args.GetException() is { } ex) + { + Error?.Invoke(this, new ParseLiveQueryErrorEventArgs(-1, ex.Message, false, ex)); + } + } + + private async Task SendAndWaitForSignalAsync(IDictionary message, + ConcurrentDictionary signalDictionary, + int requestId, + CancellationToken cancellationToken) + { + TaskCompletionSource tcs = new TaskCompletionSource(); + signalDictionary.TryAdd(requestId, tcs); + + try + { + await SendMessage(message, cancellationToken); + + using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(TimeOut); + + await tcs.Task.WaitAsync(cts.Token); + } + catch (OperationCanceledException) + { + throw new TimeoutException($"Operation timeout for request {requestId}"); + } + finally + { + signalDictionary.TryRemove(requestId, out _); + } + } + + /// + /// Subscribes to a live query, enabling real-time updates for the specified query object. + /// This method sends a subscription request to the live query server and manages the lifecycle of the subscription. + /// + /// + /// The type of the ParseObject associated with the live query. + /// + /// + /// The live query instance to subscribe to. It contains details about the query and its parameters. + /// + /// + /// A token to monitor for cancellation requests. It allows the operation to be canceled if requested. + /// + /// + /// An object representing the active subscription for the specified query, enabling interaction with the subscribed events and updates. + /// + /// + /// Thrown when attempting to subscribe while the live query connection is in a closed state. + /// + /// + /// Thrown when the subscription request times out before receiving confirmation from the server. + /// + public async Task SubscribeAsync(ParseLiveQuery liveQuery, CancellationToken cancellationToken = default) where T : ParseObject + { + if (_state == ParseLiveQueryState.Closed) + { + throw new InvalidOperationException("Cannot subscribe to a live query when the connection is closed."); + } + + int requestId = Interlocked.Increment(ref LastRequestId); + Dictionary message = new Dictionary + { + { "op", "subscribe" }, + { "requestId", requestId }, + { "query", liveQuery.BuildParameters() } + }; + await SendAndWaitForSignalAsync(await AppendSessionToken(message), SubscriptionSignals, requestId, cancellationToken); + ParseLiveQuerySubscription subscription = new ParseLiveQuerySubscription(liveQuery.Services, liveQuery.ClassName, requestId); + Subscriptions.TryAdd(requestId, subscription); + return subscription; + } + + /// + /// Updates an active subscription by sending an "update" operation to the live query server. + /// This method modifies the parameters of an existing subscription for a specific query. + /// + /// + /// The live query object that holds the query parameters to be updated. + /// + /// + /// The unique identifier of the subscription to update. + /// + /// + /// A token to monitor for cancellation requests, allowing the operation to be cancelled before completion. + /// + /// + /// The type of the ParseObject that the query targets. + /// + /// + /// A task that represents the asynchronous operation of updating the subscription. + /// + public async Task UpdateSubscriptionAsync(ParseLiveQuery liveQuery, int requestId, CancellationToken cancellationToken = default) where T : ParseObject + { + Dictionary message = new Dictionary + { + { "op", "update" }, + { "requestId", requestId }, + { "query", liveQuery.BuildParameters() } + }; + await SendAndWaitForSignalAsync(await AppendSessionToken(message), SubscriptionSignals, requestId, cancellationToken); + } + + /// + /// Unsubscribes from a live query subscription associated with the given request identifier. + /// + /// + /// The unique identifier of the subscription to unsubscribe from. + /// + /// + /// A cancellation token that can be used to cancel the unsubscription operation before completion. + /// + /// + /// A task that represents the asynchronous unsubscription operation. + /// + /// + /// Thrown if the unsubscription process does not complete within the specified timeout period. + /// + public async Task UnsubscribeAsync(int requestId, CancellationToken cancellationToken = default) + { + Dictionary message = new Dictionary + { + { "op", "unsubscribe" }, + { "requestId", requestId } + }; + await SendAndWaitForSignalAsync(message, UnsubscriptionSignals, requestId, cancellationToken); + Subscriptions.TryRemove(requestId, out _); + } + + /// + /// Closes the live query connection, resets the state to close, and clears all active subscriptions and signals. + /// + /// + /// A token to monitor for cancellation requests while closing the connection. + /// + /// + /// A task that represents the asynchronous operation for closing the live query connection. + /// + public async Task CloseAsync(CancellationToken cancellationToken = default) + { + WebSocketClient.MessageReceived -= WebSocketClientOnMessageReceived; + WebSocketClient.WebsocketError -= WebSocketClientOnWebsocketError; + WebSocketClient.UnknownError -= WebSocketClientOnUnknownError; + await WebSocketClient.CloseAsync(cancellationToken); + _state = ParseLiveQueryState.Closed; + SubscriptionSignals.Clear(); + UnsubscriptionSignals.Clear(); + Subscriptions.Clear(); + } + + /// + /// Releases all resources used by the instance. + /// + /// + /// This method is used to clean up resources, such as closing open connections or unsubscribing from events, + /// and should be called when the instance is no longer needed. After calling this method, the instance + /// cannot be used unless re-initialized. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Asynchronously releases the resources used by the instance. + /// + /// + /// A representing the asynchronous dispose operation. + /// + /// + /// This method is called to perform an asynchronous disposal of the resources held by the current + /// instance. It suppresses finalization of the object to optimize resource cleanup. + /// + public async ValueTask DisposeAsync() + { + await CloseAsync(); + GC.SuppressFinalize(this); + } + + /// + /// Releases the resources used by the instance. + /// + /// + /// This method implements the interface and is used to clean up any managed or unmanaged + /// resources used by the instance. + /// + private void Dispose(bool disposing) + { + if (disposed) + return; + if (disposing) + { + // For sync disposal, the best effort cleanup without waiting + _ = Task.Run(async () => + { + try + { + await CloseAsync(); + } + catch (Exception ex) + { + Debug.WriteLine($"Error during disposal: {ex}"); + } + }); + } + disposed = true; + } +} \ No newline at end of file diff --git a/Parse/Platform/LiveQueries/ParseLiveQueryDualEventArgs.cs b/Parse/Platform/LiveQueries/ParseLiveQueryDualEventArgs.cs new file mode 100644 index 00000000..78b820c8 --- /dev/null +++ b/Parse/Platform/LiveQueries/ParseLiveQueryDualEventArgs.cs @@ -0,0 +1,22 @@ +using System; + +namespace Parse.Platform.LiveQueries; + +/// +/// Provides event arguments for events triggered by Parse's Live Query service. +/// This class encapsulates details about a particular event, such as the operation type, +/// client ID, request ID, and the associated Parse object data. +/// +public class ParseLiveQueryDualEventArgs : ParseLiveQueryEventArgs +{ + /// + /// Gets the state of the Parse object before the live query event was triggered. + /// This property represents the original data of the Parse object prior to any updates, + /// providing a snapshot of its previous state for comparison purposes during events + /// such as updates or deletes. + /// + public ParseObject Original { get; } + + internal ParseLiveQueryDualEventArgs(ParseObject current, ParseObject original) : base(current) => + Original = original ?? throw new ArgumentNullException(nameof(original)); +} diff --git a/Parse/Platform/LiveQueries/ParseLiveQueryErrorEventArgs.cs b/Parse/Platform/LiveQueries/ParseLiveQueryErrorEventArgs.cs new file mode 100644 index 00000000..90d225e2 --- /dev/null +++ b/Parse/Platform/LiveQueries/ParseLiveQueryErrorEventArgs.cs @@ -0,0 +1,54 @@ +using System; + +namespace Parse.Platform.LiveQueries; + +/// +/// Represents the arguments for an error event that occurs during a live query in the Parse platform. +/// +public class ParseLiveQueryErrorEventArgs : EventArgs +{ + /// + /// Gets or sets the error message associated with a live query operation. + /// + /// + /// The property contains a description of the error that occurred during + /// a live query operation. It can provide detailed information about the nature of the issue, + /// which can be helpful for debugging or logging purposes. + /// + public string Error { get; } + + /// + /// Gets or sets the error code associated with a live query operation. + /// + /// + /// The property contains a numerical identifier that represents + /// the type or category of the error that occurred during a live query operation. + /// This is used alongside the error message to provide detailed diagnostics or logging. + /// + public int Code { get; } + + /// + /// Gets or sets a value indicating whether the client should attempt to reconnect + /// after an error occurs during a live query operation. + /// + /// + /// The property specifies whether a reconnection to the + /// live query server is recommended or required following certain error events. + /// This can be used to determine the client's behavior in maintaining a continuous + /// connection with the server. + /// + public bool Reconnect { get; } + + public Exception LocalException { get; } + + /// + /// Represents the arguments for an error event that occurs during a live query in the Parse platform. + /// + internal ParseLiveQueryErrorEventArgs(int code, string error, bool reconnect, Exception localException = null) + { + Error = error; + Code = code; + Reconnect = reconnect; + LocalException = localException; + } +} \ No newline at end of file diff --git a/Parse/Platform/LiveQueries/ParseLiveQueryEventArgs.cs b/Parse/Platform/LiveQueries/ParseLiveQueryEventArgs.cs new file mode 100644 index 00000000..aed7c9f6 --- /dev/null +++ b/Parse/Platform/LiveQueries/ParseLiveQueryEventArgs.cs @@ -0,0 +1,21 @@ +using System; + +namespace Parse.Platform.LiveQueries; + +/// +/// Provides event arguments for events triggered by Parse's Live Query service. +/// This class encapsulates details about a particular event, such as the operation type, +/// client ID, request ID, and the associated Parse object data. +/// +public class ParseLiveQueryEventArgs : EventArgs +{ + /// + /// Gets the current state of the Parse object associated with the live query event. + /// This property provides the details of the Parse object as it existed at the time + /// the event was triggered, reflecting any changes made during operations such as + /// an update or creation. + /// + public ParseObject Object { get; } + + internal ParseLiveQueryEventArgs(ParseObject current) => Object = current ?? throw new ArgumentNullException(nameof(current)); +} diff --git a/Parse/Platform/LiveQueries/ParseLiveQuerySubscription.cs b/Parse/Platform/LiveQueries/ParseLiveQuerySubscription.cs new file mode 100644 index 00000000..dfd46300 --- /dev/null +++ b/Parse/Platform/LiveQueries/ParseLiveQuerySubscription.cs @@ -0,0 +1,143 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Parse.Abstractions.Infrastructure; +using Parse.Abstractions.Platform.LiveQueries; +using Parse.Abstractions.Platform.Objects; + +namespace Parse.Platform.LiveQueries; + +/// +/// Represents a subscription to updates for a LiveQuery in a Parse Server. Provides hooks for handling +/// various events such as creation, update, deletion, entering, and leaving of objects that match the query. +/// +public class ParseLiveQuerySubscription : IParseLiveQuerySubscription where T : ParseObject +{ + string ClassName { get; } + IServiceHub Services { get; } + + private int RequestId { get; set; } + + /// + /// Represents the Create event for a live query subscription. + /// This event is triggered when a new object matching the subscription's query is created. + /// + public event EventHandler Create; + + /// + /// Represents the Enter event for a live query subscription. + /// This event is triggered when an object that did not previously match the query (and was thus not part of the subscription) + /// starts matching the query, typically due to an update. + /// + public event EventHandler Enter; + + /// + /// Represents the Update event for a live query subscription. + /// This event is triggered when an existing object matching the subscription's query is updated. + /// + public event EventHandler Update; + + /// + /// Represents the Leave event for a live query subscription. + /// This event is triggered when an object that previously matched the subscription's query + /// no longer matches the criteria and is removed. + /// + public event EventHandler Leave; + + /// + /// Represents the Delete event for a live query subscription. + /// This event is triggered when an object matching the subscription's query is deleted. + /// + public event EventHandler Delete; + + /// + /// Represents a subscription to a live query, allowing the client to receive real-time event notifications + /// from the Parse Live Query server for a specified query. This class is responsible for handling events + /// such as object creation, updates, deletions, and entering or leaving a query's result set. + /// + public ParseLiveQuerySubscription(IServiceHub serviceHub, string className, int requestId) + { + Services = serviceHub; + ClassName = className; + RequestId = requestId; + } + + /// + /// Updates the current live query subscription with new query parameters, + /// effectively modifying the subscription to reflect the provided live query. + /// This allows adjustments to the filter or watched keys without unsubscribing + /// and re-subscribing. + /// + /// The type of the ParseObject associated with the subscription. + /// The updated live query containing new parameters that + /// will replace the existing ones for this subscription. + /// A token to monitor for cancellation requests. If triggered, + /// the update process will be halted. + /// A task that represents the asynchronous operation of updating + /// the subscription with the new query parameters. + public async Task UpdateAsync(ParseLiveQuery liveQuery, CancellationToken cancellationToken = default) where T1 : ParseObject => + await Services.LiveQueryController.UpdateSubscriptionAsync(liveQuery, RequestId, cancellationToken); + + /// + /// Cancels the current live query subscription by unsubscribing from the Parse Live Query server. + /// This ensures that the client will no longer receive real-time updates or notifications + /// associated with this subscription. + /// + /// A token to monitor for cancellation requests. If triggered, the cancellation process will halt. + /// A task that represents the asynchronous operation of canceling the subscription. + public async Task CancelAsync(CancellationToken cancellationToken = default) => + await Services.LiveQueryController.UnsubscribeAsync(RequestId, cancellationToken); + + /// + /// Handles the creation event for an object that matches the subscription's query. + /// Invokes the Create event with the parsed object details contained within the provided object state. + /// + /// + /// The state of the object that triggered the creation event, containing its data and metadata. + /// + public void OnCreate(IObjectState objectState) => + Create?.Invoke(this, new ParseLiveQueryEventArgs(Services.GenerateObjectFromState(objectState, ClassName))); + + /// + /// Handles the event when an object enters the result set of a live query subscription. This occurs when an + /// object begins to satisfy the query conditions. + /// + /// The current state of the object that has entered the query result set. + /// The original state of the object before entering the query result set. + public void OnEnter(IObjectState objectState, IObjectState originalState) => + Enter?.Invoke(this, new ParseLiveQueryDualEventArgs( + Services.GenerateObjectFromState(objectState, ClassName), + Services.GenerateObjectFromState(originalState, ClassName))); + + /// + /// Handles the update event for objects subscribed to the Live Query. This method triggers the Update + /// event, providing the updated object and its original state. + /// + /// The new state of the object after the update. + /// The original state of the object before the update. + public void OnUpdate(IObjectState objectState, IObjectState originalState) => + Update?.Invoke(this, new ParseLiveQueryDualEventArgs( + Services.GenerateObjectFromState(objectState, ClassName), + Services.GenerateObjectFromState(originalState, ClassName))); + + /// + /// Handles the event when an object leaves the result set of the live query subscription. + /// This method triggers the event to notify that an object has + /// transitioned out of the query's result set. + /// + /// The state of the object that left the result set. + /// The original state of the object before it left the result set. + public void OnLeave(IObjectState objectState, IObjectState originalState) => + Leave?.Invoke(this, new ParseLiveQueryDualEventArgs( + Services.GenerateObjectFromState(objectState, ClassName), + Services.GenerateObjectFromState(originalState, ClassName))); + + /// + /// Handles the "delete" event for a live query subscription, triggered when an object is removed + /// from the query's result set. This method processes the event by invoking the associated + /// delete event handler, if subscribed, with the relevant object data. + /// + /// The state information of the object that was deleted. + public void OnDelete(IObjectState objectState) => + Delete?.Invoke(this, new ParseLiveQueryEventArgs(Services.GenerateObjectFromState(objectState, ClassName))); +} \ No newline at end of file diff --git a/Parse/Platform/ParseClient.cs b/Parse/Platform/ParseClient.cs index 4e3b4a8b..70aac7d1 100644 --- a/Parse/Platform/ParseClient.cs +++ b/Parse/Platform/ParseClient.cs @@ -97,6 +97,67 @@ public ParseClient(IServerConnectionData configuration, IServiceHub serviceHub = Services.ClassController.AddIntrinsic(); } + /// + /// Creates a new and authenticates it as belonging to your application. This class is a hub for interacting with the SDK. The recommended way to use this class on client applications is to instantiate it, then call on it in your application entry point. This allows you to access . + /// + /// The configuration to initialize Parse with. + /// The configuration to initialize the Parse live query client with. + /// A service hub to override internal services and thereby make the Parse SDK operate in a custom manner. + /// A set of implementation instances to tweak the behaviour of the SDK. + public ParseClient(IServerConnectionData configuration, ILiveQueryServerConnectionData liveQueryConfiguration, IServiceHub serviceHub = default, params IServiceHubMutator[] configurators) + { + Services = serviceHub is { } + ? new OrchestrationServiceHub { Custom = serviceHub, Default = new ServiceHub { ServerConnectionData = GenerateServerConnectionData(), LiveQueryServerConnectionData = GenerateLiveQueryServerConnectionData() } } + : new ServiceHub { ServerConnectionData = GenerateServerConnectionData(), LiveQueryServerConnectionData = GenerateLiveQueryServerConnectionData() } as IServiceHub; + + IServerConnectionData GenerateServerConnectionData() => configuration switch + { + null => throw new ArgumentNullException(nameof(configuration)), + ServerConnectionData { Test: true, ServerURI: { } } data => data, + ServerConnectionData { Test: true } data => new ServerConnectionData + { + ApplicationID = data.ApplicationID, + Headers = data.Headers, + MasterKey = data.MasterKey, + Test = data.Test, + Key = data.Key, + ServerURI = "https://api.parse.com/1/" + }, + { ServerURI: "https://api.parse.com/1/" } => throw new InvalidOperationException("Since the official parse server has shut down, you must specify a URI that points to a hosted instance."), + { ApplicationID: { }, ServerURI: { }, Key: { } } data => data, + _ => throw new InvalidOperationException("The IServerConnectionData implementation instance provided to the ParseClient constructor must be populated with the information needed to connect to a Parse server instance.") + }; + + ILiveQueryServerConnectionData GenerateLiveQueryServerConnectionData() => liveQueryConfiguration switch + { + null => throw new ArgumentNullException(nameof(liveQueryConfiguration)), + LiveQueryServerConnectionData { Test: true, ServerURI: { } } data => data, + LiveQueryServerConnectionData { Test: true } data => new LiveQueryServerConnectionData + { + ApplicationID = data.ApplicationID, + Headers = data.Headers, + MasterKey = data.MasterKey, + Test = data.Test, + Key = data.Key, + ServerURI = "wss://api.parse.com/1/" + }, + { ServerURI: "wss://api.parse.com/1/" } => throw new InvalidOperationException("Since the official parse server has shut down, you must specify a URI that points to a hosted instance."), + { ApplicationID: { }, ServerURI: { }, Key: { } } data => data, + _ => throw new InvalidOperationException("The IServerConnectionData implementation instance provided to the ParseClient constructor must be populated with the information needed to connect to a Parse server instance.") + }; + + if (configurators is { Length: int length } && length > 0) + { + Services = serviceHub switch + { + IMutableServiceHub { } mutableServiceHub => BuildHub((Hub: mutableServiceHub, mutableServiceHub.ServerConnectionData = serviceHub.ServerConnectionData ?? Services.ServerConnectionData, mutableServiceHub.LiveQueryServerConnectionData = serviceHub.LiveQueryServerConnectionData ?? Services.LiveQueryServerConnectionData).Hub, Services, configurators), + { } => BuildHub(default, Services, configurators) + }; + } + + Services.ClassController.AddIntrinsic(); + } + /// /// Initializes a instance using the set on the 's implementation instance. /// diff --git a/Parse/Platform/Queries/ParseQuery.cs b/Parse/Platform/Queries/ParseQuery.cs index 4587c85b..69fcaf87 100644 --- a/Parse/Platform/Queries/ParseQuery.cs +++ b/Parse/Platform/Queries/ParseQuery.cs @@ -911,4 +911,16 @@ public override int GetHashCode() // TODO (richardross): Implement this. return 0; } + + /// + /// Creates a live query from this query that can be used to receive real-time updates + /// when objects matching the query are created, updated, or deleted. + /// + /// A new ParseLiveQuery instace configured with this query's parameters. + public ParseLiveQuery GetLive() + { + ArgumentNullException.ThrowIfNull(Filters); + IDictionary filters = BuildParameters().TryGetValue("where", out object where) ? where as IDictionary : null; + return new ParseLiveQuery(Services, ClassName, filters, KeySelections); + } } diff --git a/Parse/Utilities/ObjectServiceExtensions.cs b/Parse/Utilities/ObjectServiceExtensions.cs index 4f7a8320..5271edb5 100644 --- a/Parse/Utilities/ObjectServiceExtensions.cs +++ b/Parse/Utilities/ObjectServiceExtensions.cs @@ -10,6 +10,9 @@ using Parse.Infrastructure.Utilities; using Parse.Infrastructure.Data; using System.Diagnostics; +using Parse.Abstractions.Infrastructure.Execution; +using Parse.Abstractions.Platform.LiveQueries; +using Parse.Platform.LiveQueries; namespace Parse; @@ -91,7 +94,7 @@ public static T CreateObject(this IServiceHub serviceHub) where T : ParseObje /// A new ParseObject for the given class name. public static T CreateObject(this IParseObjectClassController classController, IServiceHub serviceHub) where T : ParseObject { - + return (T) classController.Instantiate(classController.GetClassName(typeof(T)), serviceHub); } @@ -287,6 +290,32 @@ public static ParseQuery GetQuery(this IServiceHub serviceHub, stri return new ParseQuery(serviceHub, className); } + /// + /// Establishes a connection to the Live Query Server, enabling real-time updates and operations for subscribed queries. + /// This method configures error handling for the connection. + /// + /// The current instance managing the Parse services. + /// Optional event handler to manage errors occurring during the live query operations. + /// A task that represents the asynchronous operation of connecting to the Live Query Server. The task completes when the connection is established. + public static async Task ConnectLiveQueryServerAsync(this IServiceHub serviceHub, EventHandler onError = null) + { + if (onError is not null) + { + serviceHub.LiveQueryController.Error += onError; + } + await serviceHub.LiveQueryController.ConnectAsync(); + } + + /// + /// Disconnects from the live query server by closing the connection established through the LiveQueryController. + /// + /// The instance managing the service resources. + /// A task representing the asynchronous operation of disconnecting from the live query server. + public static async Task DisconnectLiveQueryServerAsync(this IServiceHub serviceHub) + { + await serviceHub.LiveQueryController.CloseAsync(); + } + /// /// Saves each object in the provided list. /// @@ -337,21 +366,21 @@ internal static T GenerateObjectFromState( { throw new ArgumentNullException(nameof(state), "The state cannot be null."); } - + // Ensure the class name is determined or throw an exception string className = state.ClassName ?? defaultClassName; if (string.IsNullOrEmpty(className)) { - + throw new InvalidOperationException("Both state.ClassName and defaultClassName are null or empty. Unable to determine class name."); } - + // Create the object using the class controller T obj = classController.Instantiate(className, serviceHub) as T; - + if (obj == null) { - + throw new InvalidOperationException($"Failed to instantiate object of type {typeof(T).Name} for class {className}."); } @@ -438,7 +467,7 @@ static void CollectDirtyChildren(this IServiceHub serviceHub, object node, IList { CollectDirtyChildren(serviceHub, node, dirtyChildren, new HashSet(new IdentityEqualityComparer()), new HashSet(new IdentityEqualityComparer())); } - + internal static async Task DeepSaveAsync(this IServiceHub serviceHub, object target, string sessionToken, CancellationToken cancellationToken) { // Collect dirty objects