Services
This document provides detailed documentation for all service classes in the WebSocket Manager.
IWebSocketService / WebSocketService
Namespace: Virtufin.WebSocketManager.Services
Purpose: Core service for managing WebSocket connections. Provides high-level operations for connecting, disconnecting, listing, and sending messages through WebSocket connections.
Interface Definition
public interface IWebSocketService
{
Task<(WebSocketConnection? Connection, string? Error)> ConnectAsync(
string url, bool autoReconnect, CancellationToken cancellationToken = default);
Task<IEnumerable<WebSocketConnection>> ListConnectionsAsync(
CancellationToken cancellationToken = default);
Task<(bool Success, string? Error)> DisconnectAsync(
string id, CancellationToken cancellationToken = default);
Task<(bool Success, string? Error)> StartPublishAsync(
string id, string topic, CancellationToken cancellationToken = default);
Task<(bool Success, string? Error)> StopPublishAsync(
string id, CancellationToken cancellationToken = default);
Task<(string? Response, string? Error)> SendAsync(
string id, string message, int timeoutMs, CancellationToken cancellationToken = default);
Task<(bool Success, string? Error)> SendRawAsync(
string id, string message, CancellationToken cancellationToken = default);
}
ConnectAsync
Establishes a new WebSocket connection.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| url | string | The WebSocket server URL (must be non-empty) |
| autoReconnect | bool | Whether to enable auto-reconnect on failure |
| cancellationToken | CancellationToken | Optional cancellation token |
Returns: A tuple containing the created WebSocketConnection if successful, or null with an error message if unsuccessful.
Behavior:
1. Validates that url is non-empty
2. Creates a new connection entry via IWebSocketConnectionStore
3. Connects via IWebSocketClientWrapper
4. Starts the receive loop to handle incoming messages
5. If publishing is configured, publishes messages to the topic via IDaprPublisher
6. Persists the connection to the store
7. On failure, removes the connection from the store and returns the error
ListConnectionsAsync
Retrieves all active WebSocket connections.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| cancellationToken | CancellationToken | Optional cancellation token |
Returns: An enumerable collection of all WebSocketConnection instances.
DisconnectAsync
Disconnects and removes a WebSocket connection.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| id | string | The connection ID to disconnect |
| cancellationToken | CancellationToken | Optional cancellation token |
Returns: A tuple indicating success and an error message if unsuccessful.
Behavior:
1. Retrieves the connection from the store
2. Validates ownership via CheckOwnership
3. Disconnects via IWebSocketClientWrapper (errors are logged but not thrown)
4. Removes the connection from the store
StartPublishAsync
Configures a connection to publish messages to a Dapr pub/sub topic.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| id | string | The connection ID |
| topic | string | The Dapr pub/sub topic name (must be non-empty) |
| cancellationToken | CancellationToken | Optional cancellation token |
Returns: A tuple indicating success and an error message if unsuccessful.
Errors:
- "Connection not found" - Connection ID does not exist
- Ownership errors from CheckOwnership
- "Websocket is not connected" - Connection status is not Connected
- "Topic is required" - Topic parameter is empty
StopPublishAsync
Stops publishing messages for a connection.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| id | string | The connection ID |
| cancellationToken | CancellationToken | Optional cancellation token |
Returns: A tuple indicating success and an error message if unsuccessful.
Behavior: Sets connection.Topic to null and updates the store.
SendAsync
Sends a message and waits for a correlated response.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| id | string | The connection ID |
| message | string | The message to send |
| timeoutMs | int | Timeout in milliseconds to wait for response |
| cancellationToken | CancellationToken | Optional cancellation token |
Returns: A tuple containing the response message if successful, or null with an error message.
Errors:
- "Connection not found"
- Ownership errors
- "Websocket is not connected"
- "Request timeout" - Timeout exceeded
Implementation Detail: The message is wrapped with a correlation ID and sent via IWebSocketClientWrapper.SendAndWaitAsync. Responses are matched by correlation ID.
SendRawAsync
Sends a raw message without waiting for a response.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| id | string | The connection ID |
| message | string | The message to send |
| cancellationToken | CancellationToken | Optional cancellation token |
Returns: A tuple indicating success and an error message if unsuccessful.
CheckOwnership (Private)
Validates that the current instance owns the connection.
private bool CheckOwnership(WebSocketConnection connection, out string? error)
{
error = null;
if (connection.InstanceId != _instanceIdProvider.GetInstanceId())
{
error = $"Connection owned by different instance: {connection.InstanceId}";
return false;
}
return true;
}
Purpose: Prevents cross-instance modifications in distributed deployments.
IWebSocketConnectionStore / DistributedWebSocketConnectionStore
Namespace: Virtufin.WebSocketManager.Services
Purpose: Manages WebSocket connection storage with distributed persistence via Dapr state store.
Interface Definition
public interface IWebSocketConnectionStore
{
Task<WebSocketConnection> CreateConnectionAsync(
string url, bool autoReconnect, CancellationToken cancellationToken = default);
Task<WebSocketConnection?> GetConnectionAsync(
string id, CancellationToken cancellationToken = default);
Task<IEnumerable<WebSocketConnection>> GetAllConnectionsAsync(
CancellationToken cancellationToken = default);
Task UpdateConnectionAsync(
WebSocketConnection connection, CancellationToken cancellationToken = default);
Task RemoveConnectionAsync(
string id, CancellationToken cancellationToken = default);
Task ClearAllConnectionsAsync(
CancellationToken cancellationToken = default);
}
Implementation: DistributedWebSocketConnectionStore
Uses a two-tier storage approach:
- Local cache: ConcurrentDictionary<string, WebSocketConnection> for fast in-memory access
- Dapr state store: Via IDaprConnectionRepository for distributed persistence
CreateConnectionAsync
Creates a new connection with a generated ID and the current instance ID.
Returns: A new WebSocketConnection with:
- Id = Guid.NewGuid().ToString()
- Url = provided URL
- AutoReconnect = provided value
- Status = ConnectionStatus.Disconnected
- CreatedAt = DateTime.UtcNow
- InstanceId = from IInstanceIdProvider.GetInstanceId()
The connection is added to the local cache but NOT persisted until explicitly updated.
GetConnectionAsync
Retrieves a connection by ID.
Behavior:
1. First checks local cache (_localConnections)
2. If not found, retrieves from Dapr state store via IDaprConnectionRepository
Returns: The connection if found, otherwise null.
GetAllConnectionsAsync
Retrieves all connections from both local cache and Dapr state store.
Returns: Combined list of remote and local connections (local connections are appended if not already present in remote results).
UpdateConnectionAsync
Updates both local cache and Dapr state store.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| connection | WebSocketConnection | The connection to update |
Behavior:
1. Updates local cache
2. Converts to DaprStateStoreEntry and saves via repository
RemoveConnectionAsync
Removes a connection from both local cache and Dapr state store.
ClearAllConnectionsAsync
Removes all connections from both storage tiers.
IDaprConnectionRepository / DaprConnectionRepository
Namespace: Virtufin.WebSocketManager.Services
Purpose: Handles Dapr state store operations for WebSocket connections.
Interface Definition
public interface IDaprConnectionRepository
{
Task<IEnumerable<DaprStateStoreEntry>> GetAllAsync(
CancellationToken cancellationToken = default);
Task<DaprStateStoreEntry?> GetAsync(
string id, CancellationToken cancellationToken = default);
Task SaveAsync(
DaprStateStoreEntry entry, CancellationToken cancellationToken = default);
Task DeleteAsync(
string id, CancellationToken cancellationToken = default);
}
Implementation: DaprConnectionRepository
Uses Dapr state store with an index pattern for efficient lookups.
State Store Structure
- Connection entries: Key =
websocket-{id}, Value =DaprStateStoreEntry - Index: Key =
websocket-index, Value =HashSet<string>containing all connection IDs
GetAllAsync
Retrieves all connections via the index:
1. Gets the index (HashSet<string>) from state store
2. If index is null or empty, returns empty enumeration
3. Constructs bulk state keys and retrieves all entries in one call
4. Returns only non-null entries
GetAsync
Retrieves a single connection by ID.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| id | string | The connection ID |
Returns: DaprStateStoreEntry if found, otherwise null.
SaveAsync
Saves a connection entry and updates the index.
Behavior:
1. Saves the entry with key websocket-{entry.Id}
2. Calls UpdateIndexAsync with add: true
DeleteAsync
Deletes a connection entry and updates the index.
Behavior:
1. Deletes the entry with key websocket-{id}
2. Calls UpdateIndexAsync with add: false
UpdateIndexAsync (Private)
Updates the connection index in state store.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| id | string | The connection ID |
| add | bool | true to add to index, false to remove |
Behavior:
1. Gets current index (or creates new HashSet<string> if null)
2. Adds or removes the ID
3. Saves the updated index
IWebSocketClientWrapper / WebSocketClientWrapper
Namespace: Virtufin.WebSocketManager.Services
Purpose: Provides low-level WebSocket connection operations. Manages external WebSocket connections including connection lifecycle, message sending/receiving, and auto-reconnect logic.
Interface Definition
public interface IWebSocketClientWrapper
{
Task ConnectAsync(WebSocketConnection connection);
Task DisconnectAsync(WebSocketConnection connection);
Task SendAsync(WebSocketConnection connection, string message);
Task<string> SendAndWaitAsync(
WebSocketConnection connection, string message, int timeoutMs);
void StartReceiveLoop(
WebSocketConnection connection, Func<string, Task> onMessageReceived);
}
Implementation: WebSocketClientWrapper
Manages System.Net.WebSockets.ClientWebSocket instances with correlation-based request-response matching.
Internal State
_pendingRequests:ConcurrentDictionary<string, TaskCompletionSource<string>>- Maps correlation IDs to pending response completersWrappedMessagerecord: Internal format for correlation ({ "id": "...", "message": "..." })
ConnectAsync
Establishes a WebSocket connection.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| connection | WebSocketConnection | The connection to connect |
Behavior:
1. Creates a new CancellationTokenSource and assigns to connection.CancellationTokenSource
2. Creates a new ClientWebSocket and assigns to connection.WebSocket
3. Connects to connection.Url
4. Sets connection.Status to Connected
5. On failure, sets status to Disconnected and re-throws
Throws: Exception if connection fails.
DisconnectAsync
Gracefully closes a WebSocket connection.
Behavior:
1. Cancels the CancellationTokenSource
2. If socket is open, sends close frame with NormalClosure
3. Disposes the WebSocket
4. Sets status to Disconnected
SendAsync
Sends a text message without waiting for a response.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| connection | WebSocketConnection | The connection to send on |
| message | string | The message to send |
Throws: InvalidOperationException if WebSocket is not connected.
SendAndWaitAsync
Sends a message with correlation and waits for the response.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| connection | WebSocketConnection | The connection to send on |
| message | string | The message to send |
| timeoutMs | int | Timeout in milliseconds |
Behavior:
1. Generates a new correlation ID (Guid.NewGuid().ToString())
2. Creates a TaskCompletionSource<string> for the response
3. Wraps message as WrappedMessage(correlationId, message)
4. Sends via SendAsync
5. Registers a timeout cancellation
6. Returns when response is received or timeout
Implementation Detail: The message is JSON-serialized with the correlation ID. Responses are matched in StartReceiveLoop by extracting the id field from JSON.
StartReceiveLoop
Starts a background task to receive messages.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| connection | WebSocketConnection | The connection to receive from |
| onMessageReceived | Func<string, Task> | Callback for received messages |
Behavior:
1. Cancels any existing receive operation
2. Starts a new Task running the receive loop
3. Assigns to connection.ReceiveTask
Receive Loop:
- Uses 8192-byte buffer
- Accumulates messages until EndOfMessage is true
- Extracts correlation ID from JSON messages
- If correlation ID matches a pending request, completes that request
- Otherwise, invokes onMessageReceived callback
Error Handling:
- OperationCanceledException: Silently exits (expected on disconnect)
- Other exceptions: If connection.AutoReconnect is true, attempts reconnection; otherwise marks as Disconnected
TryReconnectAsync (Private)
Attempts to reconnect with exponential backoff.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| connection | WebSocketConnection | The connection to reconnect |
| onMessageReceived | Func<string, Task> | Callback for received messages |
Behavior:
1. Retries up to WebSocketOptions.ReconnectMaxAttempts times
2. Delay doubles each attempt: baseDelay * 2^(attempt-1), capped at 30 seconds
3. On successful reconnect, restarts the receive loop
4. On all failures, sets status to Disconnected
IDaprPublisher / DaprPublisher
Namespace: Virtufin.WebSocketManager.Services
Purpose: Publishes WebSocket messages to Dapr pub/sub topics.
Interface Definition
public interface IDaprPublisher
{
Task PublishAsync(
string topic, string websocketId, string websocketUrl, string payload);
}
Implementation: DaprPublisher
PublishAsync
Publishes a message envelope to a Dapr pub/sub topic.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| topic | string | The topic name to publish to |
| websocketId | string | The ID of the WebSocket connection |
| websocketUrl | string | The URL of the WebSocket connection |
| payload | string | The message payload to publish |
Behavior:
1. Creates a MessageEnvelope record with metadata
2. Calls DaprClient.PublishEventAsync with the configured pub/sub component
MessageEnvelope Structure:
record MessageEnvelope(string WebsocketId, string WebsocketUrl, DateTime Timestamp, string Payload);
Throws: Exception if publishing fails.
IInstanceIdProvider / DefaultInstanceIdProvider
Namespace: Virtufin.WebSocketManager.Services
Purpose: Provides a unique identifier for the current service instance.
Interface Definition
public interface IInstanceIdProvider
{
string GetInstanceId();
}
Implementation: DefaultInstanceIdProvider
Returns Environment.MachineName as the instance identifier.
Use Case: Used by WebSocketService to validate connection ownership and prevent cross-instance modifications.