using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Model.Net; using Microsoft.Extensions.Logging; namespace MediaBrowser.Controller.Net { /// /// Starts sending data over a web socket periodically when a message is received, and then stops when a corresponding stop message is received /// /// The type of the T return data type. /// The type of the T state type. public abstract class BasePeriodicWebSocketListener : IWebSocketListener, IDisposable where TStateType : WebSocketListenerState, new() where TReturnDataType : class { /// /// The _active connections /// protected readonly List> ActiveConnections = new List>(); /// /// Gets the name. /// /// The name. protected abstract string Name { get; } /// /// Gets the data to send. /// /// Task{`1}. protected abstract Task GetDataToSend(); /// /// The logger /// protected ILogger Logger; protected BasePeriodicWebSocketListener(ILogger logger) { if (logger == null) { throw new ArgumentNullException(nameof(logger)); } Logger = logger; } /// /// Processes the message. /// /// The message. /// Task. public Task ProcessMessageAsync(WebSocketMessageInfo message) { if (message == null) { throw new ArgumentNullException(nameof(message)); } if (string.Equals(message.MessageType, Name + "Start", StringComparison.OrdinalIgnoreCase)) { Start(message); } if (string.Equals(message.MessageType, Name + "Stop", StringComparison.OrdinalIgnoreCase)) { Stop(message); } return Task.CompletedTask; } /// /// Starts sending messages over a web socket /// /// The message. private void Start(WebSocketMessageInfo message) { var vals = message.Data.Split(','); var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture); var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture); var cancellationTokenSource = new CancellationTokenSource(); Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name); var state = new TStateType { IntervalMs = periodMs, InitialDelayMs = dueTimeMs }; lock (ActiveConnections) { ActiveConnections.Add(new Tuple(message.Connection, cancellationTokenSource, state)); } } protected void SendData(bool force) { Tuple[] tuples; lock (ActiveConnections) { tuples = ActiveConnections .Where(c => { if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) { var state = c.Item3; if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) { return true; } } return false; }) .ToArray(); } foreach (var tuple in tuples) { SendData(tuple); } } private async void SendData(Tuple tuple) { var connection = tuple.Item1; try { var state = tuple.Item3; var cancellationToken = tuple.Item2.Token; var data = await GetDataToSend().ConfigureAwait(false); if (data != null) { await connection.SendAsync(new WebSocketMessage { MessageType = Name, Data = data }, cancellationToken).ConfigureAwait(false); state.DateLastSendUtc = DateTime.UtcNow; } } catch (OperationCanceledException) { if (tuple.Item2.IsCancellationRequested) { DisposeConnection(tuple); } } catch (Exception ex) { Logger.LogError(ex, "Error sending web socket message {Name}", Name); DisposeConnection(tuple); } } /// /// Stops sending messages over a web socket /// /// The message. private void Stop(WebSocketMessageInfo message) { lock (ActiveConnections) { var connection = ActiveConnections.FirstOrDefault(c => c.Item1 == message.Connection); if (connection != null) { DisposeConnection(connection); } } } /// /// Disposes the connection. /// /// The connection. private void DisposeConnection(Tuple connection) { Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Item1.RemoteEndPoint, GetType().Name); // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really... // connection.Item1.Dispose(); try { connection.Item2.Cancel(); connection.Item2.Dispose(); } catch (ObjectDisposedException) { //TODO Investigate and properly fix. } lock (ActiveConnections) { ActiveConnections.Remove(connection); } } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (dispose) { lock (ActiveConnections) { foreach (var connection in ActiveConnections.ToArray()) { DisposeConnection(connection); } } } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } } public class WebSocketListenerState { public DateTime DateLastSendUtc { get; set; } public long InitialDelayMs { get; set; } public long IntervalMs { get; set; } } }