using System.Text; using MediaBrowser.Common.Events; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Logging; using MediaBrowser.Model.Net; using MediaBrowser.Model.Serialization; using System; using System.Collections.Specialized; using System.IO; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Common.IO; using UniversalDetector; namespace MediaBrowser.Server.Implementations.ServerManager { /// /// Class WebSocketConnection /// public class WebSocketConnection : IWebSocketConnection { public event EventHandler Closed; /// /// The _socket /// private readonly IWebSocket _socket; /// /// The _remote end point /// public string RemoteEndPoint { get; private set; } /// /// The _cancellation token source /// private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); /// /// The _send semaphore /// private readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(1, 1); /// /// The logger /// private readonly ILogger _logger; /// /// The _json serializer /// private readonly IJsonSerializer _jsonSerializer; /// /// Gets or sets the receive action. /// /// The receive action. public Action OnReceive { get; set; } /// /// Gets the last activity date. /// /// The last activity date. public DateTime LastActivityDate { get; private set; } /// /// Gets the id. /// /// The id. public Guid Id { get; private set; } /// /// Gets or sets the URL. /// /// The URL. public string Url { get; set; } /// /// Gets or sets the query string. /// /// The query string. public NameValueCollection QueryString { get; set; } private readonly IMemoryStreamProvider _memoryStreamProvider; /// /// Initializes a new instance of the class. /// /// The socket. /// The remote end point. /// The json serializer. /// The logger. /// socket public WebSocketConnection(IWebSocket socket, string remoteEndPoint, IJsonSerializer jsonSerializer, ILogger logger, IMemoryStreamProvider memoryStreamProvider) { if (socket == null) { throw new ArgumentNullException("socket"); } if (string.IsNullOrEmpty(remoteEndPoint)) { throw new ArgumentNullException("remoteEndPoint"); } if (jsonSerializer == null) { throw new ArgumentNullException("jsonSerializer"); } if (logger == null) { throw new ArgumentNullException("logger"); } Id = Guid.NewGuid(); _jsonSerializer = jsonSerializer; _socket = socket; _socket.OnReceiveBytes = OnReceiveInternal; _socket.OnReceive = OnReceiveInternal; RemoteEndPoint = remoteEndPoint; _logger = logger; _memoryStreamProvider = memoryStreamProvider; socket.Closed += socket_Closed; } void socket_Closed(object sender, EventArgs e) { EventHelper.FireEventIfNotNull(Closed, this, EventArgs.Empty, _logger); } /// /// Called when [receive]. /// /// The bytes. private void OnReceiveInternal(byte[] bytes) { LastActivityDate = DateTime.UtcNow; if (OnReceive == null) { return; } var charset = DetectCharset(bytes); if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) { OnReceiveInternal(Encoding.UTF8.GetString(bytes)); } else { OnReceiveInternal(Encoding.ASCII.GetString(bytes)); } } private string DetectCharset(byte[] bytes) { try { using (var ms = _memoryStreamProvider.CreateNew(bytes)) { var detector = new CharsetDetector(); detector.Feed(ms); detector.DataEnd(); var charset = detector.Charset; if (!string.IsNullOrWhiteSpace(charset)) { //_logger.Debug("UniversalDetector detected charset {0}", charset); } return charset; } } catch (IOException ex) { _logger.ErrorException("Error attempting to determine web socket message charset", ex); } return null; } private void OnReceiveInternal(string message) { LastActivityDate = DateTime.UtcNow; if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) { // This info is useful sometimes but also clogs up the log //_logger.Error("Received web socket message that is not a json structure: " + message); return; } if (OnReceive == null) { return; } try { var stub = (WebSocketMessage)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage)); var info = new WebSocketMessageInfo { MessageType = stub.MessageType, Data = stub.Data == null ? null : stub.Data.ToString(), Connection = this }; OnReceive(info); } catch (Exception ex) { _logger.ErrorException("Error processing web socket message", ex); } } /// /// Sends a message asynchronously. /// /// /// The message. /// The cancellation token. /// Task. /// message public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { if (message == null) { throw new ArgumentNullException("message"); } var json = _jsonSerializer.SerializeToString(message); return SendAsync(json, cancellationToken); } /// /// Sends a message asynchronously. /// /// The buffer. /// The cancellation token. /// Task. public async Task SendAsync(byte[] buffer, CancellationToken cancellationToken) { if (buffer == null) { throw new ArgumentNullException("buffer"); } cancellationToken.ThrowIfCancellationRequested(); // Per msdn docs, attempting to send simultaneous messages will result in one failing. // This should help us workaround that and ensure all messages get sent await _sendSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { await _socket.SendAsync(buffer, true, cancellationToken); } catch (OperationCanceledException) { _logger.Info("WebSocket message to {0} was cancelled", RemoteEndPoint); throw; } catch (Exception ex) { _logger.ErrorException("Error sending WebSocket message {0}", ex, RemoteEndPoint); throw; } finally { _sendSemaphore.Release(); } } public async Task SendAsync(string text, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(text)) { throw new ArgumentNullException("text"); } cancellationToken.ThrowIfCancellationRequested(); // Per msdn docs, attempting to send simultaneous messages will result in one failing. // This should help us workaround that and ensure all messages get sent await _sendSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { await _socket.SendAsync(text, true, cancellationToken); } catch (OperationCanceledException) { _logger.Info("WebSocket message to {0} was cancelled", RemoteEndPoint); throw; } catch (Exception ex) { _logger.ErrorException("Error sending WebSocket message {0}", ex, RemoteEndPoint); throw; } finally { _sendSemaphore.Release(); } } /// /// Gets the state. /// /// The state. public WebSocketState State { get { return _socket.State; } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (dispose) { _cancellationTokenSource.Dispose(); _socket.Dispose(); } } } }