using MediaBrowser.Model.Logging; using MediaBrowser.Model.Net; using MediaBrowser.Model.Threading; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Net.WebSockets; using System.Threading.Tasks; using System.Threading; using System; namespace MediaBrowser.Controller.Net { /// /// Starts sending data over a web socket periodically when a message is received, and then stops when a corresponding stop message is received /// /// The type of the T return data type. /// The type of the T state type. public abstract class BasePeriodicWebSocketListener : IWebSocketListener, IDisposable where TStateType : WebSocketListenerState, new() where TReturnDataType : class { /// /// The _active connections /// protected readonly List> ActiveConnections = new List>(); /// /// Gets the name. /// /// The name. protected abstract string Name { get; } /// /// Gets the data to send. /// /// The state. /// Task{`1}. protected abstract Task GetDataToSend(TStateType state, CancellationToken cancellationToken); /// /// The logger /// protected ILogger Logger; protected ITimerFactory TimerFactory { get; private set; } protected BasePeriodicWebSocketListener(ILogger logger) { if (logger == null) { throw new ArgumentNullException("logger"); } Logger = logger; } /// /// Processes the message. /// /// The message. /// Task. public Task ProcessMessage(WebSocketMessageInfo message) { if (message == null) { throw new ArgumentNullException("message"); } if (string.Equals(message.MessageType, Name + "Start", StringComparison.OrdinalIgnoreCase)) { Start(message); } if (string.Equals(message.MessageType, Name + "Stop", StringComparison.OrdinalIgnoreCase)) { Stop(message); } return Task.FromResult(true); } protected readonly CultureInfo UsCulture = new CultureInfo("en-US"); protected virtual bool SendOnTimer { get { return false; } } protected virtual void ParseMessageParams(string[] values) { } /// /// Starts sending messages over a web socket /// /// The message. private void Start(WebSocketMessageInfo message) { var vals = message.Data.Split(','); var dueTimeMs = long.Parse(vals[0], UsCulture); var periodMs = long.Parse(vals[1], UsCulture); if (vals.Length > 2) { ParseMessageParams(vals.Skip(2).ToArray()); } var cancellationTokenSource = new CancellationTokenSource(); Logger.Debug("{1} Begin transmitting over websocket to {0}", message.Connection.RemoteEndPoint, GetType().Name); var timer = SendOnTimer ? TimerFactory.Create(TimerCallback, message.Connection, Timeout.Infinite, Timeout.Infinite) : null; var state = new TStateType { IntervalMs = periodMs, InitialDelayMs = dueTimeMs }; lock (ActiveConnections) { ActiveConnections.Add(new Tuple(message.Connection, cancellationTokenSource, timer, state)); } if (timer != null) { timer.Change(TimeSpan.FromMilliseconds(dueTimeMs), TimeSpan.FromMilliseconds(periodMs)); } } /// /// Timers the callback. /// /// The state. private void TimerCallback(object state) { var connection = (IWebSocketConnection)state; Tuple tuple; lock (ActiveConnections) { tuple = ActiveConnections.FirstOrDefault(c => c.Item1 == connection); } if (tuple == null) { return; } if (connection.State != WebSocketState.Open || tuple.Item2.IsCancellationRequested) { DisposeConnection(tuple); return; } SendData(tuple); } protected void SendData(bool force) { Tuple[] tuples; lock (ActiveConnections) { tuples = ActiveConnections .Where(c => { if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) { var state = c.Item4; if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) { return true; } } return false; }) .ToArray(); } foreach (var tuple in tuples) { SendData(tuple); } } private async void SendData(Tuple tuple) { var connection = tuple.Item1; try { var state = tuple.Item4; var cancellationToken = tuple.Item2.Token; var data = await GetDataToSend(state, cancellationToken).ConfigureAwait(false); if (data != null) { await connection.SendAsync(new WebSocketMessage { MessageType = Name, Data = data }, cancellationToken).ConfigureAwait(false); state.DateLastSendUtc = DateTime.UtcNow; } } catch (OperationCanceledException) { if (tuple.Item2.IsCancellationRequested) { DisposeConnection(tuple); } } catch (Exception ex) { Logger.ErrorException("Error sending web socket message {0}", ex, Name); DisposeConnection(tuple); } } /// /// Stops sending messages over a web socket /// /// The message. private void Stop(WebSocketMessageInfo message) { lock (ActiveConnections) { var connection = ActiveConnections.FirstOrDefault(c => c.Item1 == message.Connection); if (connection != null) { DisposeConnection(connection); } } } /// /// Disposes the connection. /// /// The connection. private void DisposeConnection(Tuple connection) { Logger.Debug("{1} stop transmitting over websocket to {0}", connection.Item1.RemoteEndPoint, GetType().Name); var timer = connection.Item3; if (timer != null) { try { timer.Dispose(); } catch (ObjectDisposedException) { } } try { connection.Item2.Cancel(); connection.Item2.Dispose(); } catch (ObjectDisposedException) { } ActiveConnections.Remove(connection); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (dispose) { lock (ActiveConnections) { foreach (var connection in ActiveConnections.ToArray()) { DisposeConnection(connection); } } } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); } } public class WebSocketListenerState { public DateTime DateLastSendUtc { get; set; } public long InitialDelayMs { get; set; } public long IntervalMs { get; set; } } }