Skip to content

Services

This document provides detailed documentation for all service classes in the WebSocket Manager.


IWebSocketService / WebSocketService

Namespace: Virtufin.WebSocketManager.Services

Purpose: Core service for managing WebSocket connections. Provides high-level operations for connecting, disconnecting, listing, and sending messages through WebSocket connections.

Interface Definition

public interface IWebSocketService
{
    Task<(WebSocketConnection? Connection, string? Error)> ConnectAsync(
        string url, bool autoReconnect, CancellationToken cancellationToken = default);

    Task<IEnumerable<WebSocketConnection>> ListConnectionsAsync(
        CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> DisconnectAsync(
        string id, CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> StartPublishAsync(
        string id, string topic, CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> StopPublishAsync(
        string id, CancellationToken cancellationToken = default);

    Task<(string? Response, string? Error)> SendAsync(
        string id, string message, int timeoutMs, CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> SendRawAsync(
        string id, string message, CancellationToken cancellationToken = default);
}

ConnectAsync

Establishes a new WebSocket connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | url | string | The WebSocket server URL (must be non-empty) | | autoReconnect | bool | Whether to enable auto-reconnect on failure | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple containing the created WebSocketConnection if successful, or null with an error message if unsuccessful.

Behavior: 1. Validates that url is non-empty 2. Creates a new connection entry via IWebSocketConnectionStore 3. Connects via IWebSocketClientWrapper 4. Starts the receive loop to handle incoming messages 5. If publishing is configured, publishes messages to the topic via IDaprPublisher 6. Persists the connection to the store 7. On failure, removes the connection from the store and returns the error


ListConnectionsAsync

Retrieves all active WebSocket connections.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | cancellationToken | CancellationToken | Optional cancellation token |

Returns: An enumerable collection of all WebSocketConnection instances.


DisconnectAsync

Disconnects and removes a WebSocket connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID to disconnect | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.

Behavior: 1. Retrieves the connection from the store 2. Validates ownership via CheckOwnership 3. Disconnects via IWebSocketClientWrapper (errors are logged but not thrown) 4. Removes the connection from the store


StartPublishAsync

Configures a connection to publish messages to a Dapr pub/sub topic.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | topic | string | The Dapr pub/sub topic name (must be non-empty) | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.

Errors: - "Connection not found" - Connection ID does not exist - Ownership errors from CheckOwnership - "Websocket is not connected" - Connection status is not Connected - "Topic is required" - Topic parameter is empty


StopPublishAsync

Stops publishing messages for a connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.

Behavior: Sets connection.Topic to null and updates the store.


SendAsync

Sends a message and waits for a correlated response.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | message | string | The message to send | | timeoutMs | int | Timeout in milliseconds to wait for response | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple containing the response message if successful, or null with an error message.

Errors: - "Connection not found" - Ownership errors - "Websocket is not connected" - "Request timeout" - Timeout exceeded

Implementation Detail: The message is wrapped with a correlation ID and sent via IWebSocketClientWrapper.SendAndWaitAsync. Responses are matched by correlation ID.


SendRawAsync

Sends a raw message without waiting for a response.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | message | string | The message to send | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.


CheckOwnership (Private)

Validates that the current instance owns the connection.

private bool CheckOwnership(WebSocketConnection connection, out string? error)
{
    error = null;
    if (connection.InstanceId != _instanceIdProvider.GetInstanceId())
    {
        error = $"Connection owned by different instance: {connection.InstanceId}";
        return false;
    }
    return true;
}

Purpose: Prevents cross-instance modifications in distributed deployments.


IWebSocketConnectionStore / DistributedWebSocketConnectionStore

Namespace: Virtufin.WebSocketManager.Services

Purpose: Manages WebSocket connection storage with distributed persistence via Dapr state store.

Interface Definition

public interface IWebSocketConnectionStore
{
    Task<WebSocketConnection> CreateConnectionAsync(
        string url, bool autoReconnect, CancellationToken cancellationToken = default);

    Task<WebSocketConnection?> GetConnectionAsync(
        string id, CancellationToken cancellationToken = default);

    Task<IEnumerable<WebSocketConnection>> GetAllConnectionsAsync(
        CancellationToken cancellationToken = default);

    Task UpdateConnectionAsync(
        WebSocketConnection connection, CancellationToken cancellationToken = default);

    Task RemoveConnectionAsync(
        string id, CancellationToken cancellationToken = default);

    Task ClearAllConnectionsAsync(
        CancellationToken cancellationToken = default);
}

Implementation: DistributedWebSocketConnectionStore

Uses a two-tier storage approach: - Local cache: ConcurrentDictionary<string, WebSocketConnection> for fast in-memory access - Dapr state store: Via IDaprConnectionRepository for distributed persistence

CreateConnectionAsync

Creates a new connection with a generated ID and the current instance ID.

Returns: A new WebSocketConnection with: - Id = Guid.NewGuid().ToString() - Url = provided URL - AutoReconnect = provided value - Status = ConnectionStatus.Disconnected - CreatedAt = DateTime.UtcNow - InstanceId = from IInstanceIdProvider.GetInstanceId()

The connection is added to the local cache but NOT persisted until explicitly updated.


GetConnectionAsync

Retrieves a connection by ID.

Behavior: 1. First checks local cache (_localConnections) 2. If not found, retrieves from Dapr state store via IDaprConnectionRepository

Returns: The connection if found, otherwise null.


GetAllConnectionsAsync

Retrieves all connections from both local cache and Dapr state store.

Returns: Combined list of remote and local connections (local connections are appended if not already present in remote results).


UpdateConnectionAsync

Updates both local cache and Dapr state store.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to update |

Behavior: 1. Updates local cache 2. Converts to DaprStateStoreEntry and saves via repository


RemoveConnectionAsync

Removes a connection from both local cache and Dapr state store.


ClearAllConnectionsAsync

Removes all connections from both storage tiers.


IDaprConnectionRepository / DaprConnectionRepository

Namespace: Virtufin.WebSocketManager.Services

Purpose: Handles Dapr state store operations for WebSocket connections.

Interface Definition

public interface IDaprConnectionRepository
{
    Task<IEnumerable<DaprStateStoreEntry>> GetAllAsync(
        CancellationToken cancellationToken = default);

    Task<DaprStateStoreEntry?> GetAsync(
        string id, CancellationToken cancellationToken = default);

    Task SaveAsync(
        DaprStateStoreEntry entry, CancellationToken cancellationToken = default);

    Task DeleteAsync(
        string id, CancellationToken cancellationToken = default);
}

Implementation: DaprConnectionRepository

Uses Dapr state store with an index pattern for efficient lookups.

State Store Structure

  • Connection entries: Key = websocket-{id}, Value = DaprStateStoreEntry
  • Index: Key = websocket-index, Value = HashSet<string> containing all connection IDs

GetAllAsync

Retrieves all connections via the index: 1. Gets the index (HashSet<string>) from state store 2. If index is null or empty, returns empty enumeration 3. Constructs bulk state keys and retrieves all entries in one call 4. Returns only non-null entries


GetAsync

Retrieves a single connection by ID.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID |

Returns: DaprStateStoreEntry if found, otherwise null.


SaveAsync

Saves a connection entry and updates the index.

Behavior: 1. Saves the entry with key websocket-{entry.Id} 2. Calls UpdateIndexAsync with add: true


DeleteAsync

Deletes a connection entry and updates the index.

Behavior: 1. Deletes the entry with key websocket-{id} 2. Calls UpdateIndexAsync with add: false


UpdateIndexAsync (Private)

Updates the connection index in state store.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | add | bool | true to add to index, false to remove |

Behavior: 1. Gets current index (or creates new HashSet<string> if null) 2. Adds or removes the ID 3. Saves the updated index


IWebSocketClientWrapper / WebSocketClientWrapper

Namespace: Virtufin.WebSocketManager.Services

Purpose: Provides low-level WebSocket connection operations. Manages external WebSocket connections including connection lifecycle, message sending/receiving, and auto-reconnect logic.

Interface Definition

public interface IWebSocketClientWrapper
{
    Task ConnectAsync(WebSocketConnection connection);

    Task DisconnectAsync(WebSocketConnection connection);

    Task SendAsync(WebSocketConnection connection, string message);

    Task<string> SendAndWaitAsync(
        WebSocketConnection connection, string message, int timeoutMs);

    void StartReceiveLoop(
        WebSocketConnection connection, Func<string, Task> onMessageReceived);
}

Implementation: WebSocketClientWrapper

Manages System.Net.WebSockets.ClientWebSocket instances with correlation-based request-response matching.

Internal State

  • _pendingRequests: ConcurrentDictionary<string, TaskCompletionSource<string>> - Maps correlation IDs to pending response completers
  • WrappedMessage record: Internal format for correlation ({ "id": "...", "message": "..." })

ConnectAsync

Establishes a WebSocket connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to connect |

Behavior: 1. Creates a new CancellationTokenSource and assigns to connection.CancellationTokenSource 2. Creates a new ClientWebSocket and assigns to connection.WebSocket 3. Connects to connection.Url 4. Sets connection.Status to Connected 5. On failure, sets status to Disconnected and re-throws

Throws: Exception if connection fails.


DisconnectAsync

Gracefully closes a WebSocket connection.

Behavior: 1. Cancels the CancellationTokenSource 2. If socket is open, sends close frame with NormalClosure 3. Disposes the WebSocket 4. Sets status to Disconnected


SendAsync

Sends a text message without waiting for a response.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to send on | | message | string | The message to send |

Throws: InvalidOperationException if WebSocket is not connected.


SendAndWaitAsync

Sends a message with correlation and waits for the response.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to send on | | message | string | The message to send | | timeoutMs | int | Timeout in milliseconds |

Behavior: 1. Generates a new correlation ID (Guid.NewGuid().ToString()) 2. Creates a TaskCompletionSource<string> for the response 3. Wraps message as WrappedMessage(correlationId, message) 4. Sends via SendAsync 5. Registers a timeout cancellation 6. Returns when response is received or timeout

Implementation Detail: The message is JSON-serialized with the correlation ID. Responses are matched in StartReceiveLoop by extracting the id field from JSON.


StartReceiveLoop

Starts a background task to receive messages.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to receive from | | onMessageReceived | Func<string, Task> | Callback for received messages |

Behavior: 1. Cancels any existing receive operation 2. Starts a new Task running the receive loop 3. Assigns to connection.ReceiveTask

Receive Loop: - Uses 8192-byte buffer - Accumulates messages until EndOfMessage is true - Extracts correlation ID from JSON messages - If correlation ID matches a pending request, completes that request - Otherwise, invokes onMessageReceived callback

Error Handling: - OperationCanceledException: Silently exits (expected on disconnect) - Other exceptions: If connection.AutoReconnect is true, attempts reconnection; otherwise marks as Disconnected


TryReconnectAsync (Private)

Attempts to reconnect with exponential backoff.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to reconnect | | onMessageReceived | Func<string, Task> | Callback for received messages |

Behavior: 1. Retries up to WebSocketOptions.ReconnectMaxAttempts times 2. Delay doubles each attempt: baseDelay * 2^(attempt-1), capped at 30 seconds 3. On successful reconnect, restarts the receive loop 4. On all failures, sets status to Disconnected


IDaprPublisher / DaprPublisher

Namespace: Virtufin.WebSocketManager.Services

Purpose: Publishes WebSocket messages to Dapr pub/sub topics.

Interface Definition

public interface IDaprPublisher
{
    Task PublishAsync(
        string topic, string websocketId, string websocketUrl, string payload);
}

Implementation: DaprPublisher

PublishAsync

Publishes a message envelope to a Dapr pub/sub topic.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | topic | string | The topic name to publish to | | websocketId | string | The ID of the WebSocket connection | | websocketUrl | string | The URL of the WebSocket connection | | payload | string | The message payload to publish |

Behavior: 1. Creates a MessageEnvelope record with metadata 2. Calls DaprClient.PublishEventAsync with the configured pub/sub component

MessageEnvelope Structure:

record MessageEnvelope(string WebsocketId, string WebsocketUrl, DateTime Timestamp, string Payload);

Throws: Exception if publishing fails.


IInstanceIdProvider / DefaultInstanceIdProvider

Namespace: Virtufin.WebSocketManager.Services

Purpose: Provides a unique identifier for the current service instance.

Interface Definition

public interface IInstanceIdProvider
{
    string GetInstanceId();
}

Implementation: DefaultInstanceIdProvider

Returns Environment.MachineName as the instance identifier.

Use Case: Used by WebSocketService to validate connection ownership and prevent cross-instance modifications.