#nullable enable using System; using System.Buffers; using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Common.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.HttpServer { /// /// Class WebSocketConnection. /// public class WebSocketConnection : IWebSocketConnection { /// /// The logger. /// private readonly ILogger _logger; /// /// The json serializer options. /// private readonly JsonSerializerOptions _jsonOptions; /// /// The socket. /// private readonly WebSocket _socket; private bool _disposed = false; /// /// Initializes a new instance of the class. /// /// The logger. /// The socket. /// The remote end point. /// The query. public WebSocketConnection( ILogger logger, WebSocket socket, IPAddress? remoteEndPoint, IQueryCollection query) { _logger = logger; _socket = socket; RemoteEndPoint = remoteEndPoint; QueryString = query; _jsonOptions = JsonDefaults.GetOptions(); LastActivityDate = DateTime.Now; } /// public event EventHandler? Closed; /// /// Gets or sets the remote end point. /// public IPAddress? RemoteEndPoint { get; } /// /// Gets or sets the receive action. /// /// The receive action. public Func? OnReceive { get; set; } /// /// Gets the last activity date. /// /// The last activity date. public DateTime LastActivityDate { get; private set; } /// /// Gets or sets the query string. /// /// The query string. public IQueryCollection QueryString { get; } /// /// Gets the state. /// /// The state. public WebSocketState State => _socket.State; /// /// Sends a message asynchronously. /// /// /// The message. /// The cancellation token. /// Task. public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); } /// public async Task ProcessAsync(CancellationToken cancellationToken = default) { var pipe = new Pipe(); var writer = pipe.Writer; ValueWebSocketReceiveResult receiveresult; do { // Allocate at least 512 bytes from the PipeWriter Memory memory = writer.GetMemory(512); try { receiveresult = await _socket.ReceiveAsync(memory, cancellationToken); } catch (WebSocketException ex) { _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message); break; } int bytesRead = receiveresult.Count; if (bytesRead == 0) { break; } // Tell the PipeWriter how much was read from the Socket writer.Advance(bytesRead); // Make the data available to the PipeReader FlushResult flushResult = await writer.FlushAsync(); if (flushResult.IsCompleted) { // The PipeReader stopped reading break; } LastActivityDate = DateTime.UtcNow; if (receiveresult.EndOfMessage) { await ProcessInternal(pipe.Reader).ConfigureAwait(false); } } while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close); Closed?.Invoke(this, EventArgs.Empty); await _socket.CloseAsync( WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } private async Task ProcessInternal(PipeReader reader) { ReadResult result = await reader.ReadAsync().ConfigureAwait(false); ReadOnlySequence buffer = result.Buffer; if (OnReceive == null) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); return; } WebSocketMessage stub; try { if (buffer.IsSingleSegment) { stub = JsonSerializer.Deserialize>(buffer.FirstSpan, _jsonOptions); } else { var buf = ArrayPool.Shared.Rent(Convert.ToInt32(buffer.Length)); try { buffer.CopyTo(buf); stub = JsonSerializer.Deserialize>(buf, _jsonOptions); } finally { ArrayPool.Shared.Return(buf); } } } catch (JsonException ex) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); _logger.LogError(ex, "Error processing web socket message"); return; } // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); var info = new WebSocketMessageInfo { MessageType = stub.MessageType, Data = stub.Data?.ToString(), // Data can be null Connection = this }; _logger.LogDebug("WS {IP} message info: {@MessageInfo}", RemoteEndPoint, info); await OnReceive(info).ConfigureAwait(false); // Stop reading if there's no more data coming if (result.IsCompleted) { return; } } } }