using System.Collections.Generic; using System; using System.Collections.Concurrent; using System.Linq; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller.Net; using MediaBrowser.Controller.Session; using MediaBrowser.Model.Events; using MediaBrowser.Model.Net; using MediaBrowser.Model.Serialization; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.Session { /// /// Class SessionWebSocketListener /// public class SessionWebSocketListener : IWebSocketListener, IDisposable { /// /// The timeout in seconds after which a WebSocket is considered to be lost. /// public readonly int WebSocketLostTimeout = 60; /// /// The keep-alive timer factor; controls how often the timer will check on the status of the WebSockets. /// public readonly double TimerFactor = 0.2; /// /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent. /// public readonly double ForceKeepAliveFactor = 0.75; /// /// The _session manager /// private readonly ISessionManager _sessionManager; /// /// The _logger /// private readonly ILogger _logger; /// /// The _dto service /// private readonly IJsonSerializer _json; private readonly IHttpServer _httpServer; /// /// The KeepAlive timer. /// private Timer _keepAliveTimer; /// /// The WebSocket watchlist. /// private readonly ConcurrentDictionary _webSockets = new ConcurrentDictionary(); /// /// Initializes a new instance of the class. /// /// The session manager. /// The logger factory. /// The json. /// The HTTP server. public SessionWebSocketListener(ISessionManager sessionManager, ILoggerFactory loggerFactory, IJsonSerializer json, IHttpServer httpServer) { _sessionManager = sessionManager; _logger = loggerFactory.CreateLogger(GetType().Name); _json = json; _httpServer = httpServer; httpServer.WebSocketConnected += _serverManager_WebSocketConnected; } void _serverManager_WebSocketConnected(object sender, GenericEventArgs e) { var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint); if (session != null) { EnsureController(session, e.Argument); KeepAliveWebSocket(e.Argument); } else { _logger.LogWarning("Unable to determine session based on url: {0}", e.Argument.Url); } } private SessionInfo GetSession(IQueryCollection queryString, string remoteEndpoint) { if (queryString == null) { return null; } var token = queryString["api_key"]; if (string.IsNullOrWhiteSpace(token)) { return null; } var deviceId = queryString["deviceId"]; return _sessionManager.GetSessionByAuthenticationToken(token, deviceId, remoteEndpoint); } public void Dispose() { _httpServer.WebSocketConnected -= _serverManager_WebSocketConnected; StopKeepAliveTimer(); } /// /// Processes the message. /// /// The message. /// Task. public Task ProcessMessageAsync(WebSocketMessageInfo message) => Task.CompletedTask; private void EnsureController(SessionInfo session, IWebSocketConnection connection) { var controllerInfo = session.EnsureController(s => new WebSocketController(s, _logger, _sessionManager)); var controller = (WebSocketController)controllerInfo.Item1; controller.AddWebSocket(connection); } /// /// Called when a WebSocket is closed. /// /// The WebSocket. /// The event arguments. private void OnWebSocketClosed(object sender, EventArgs e) { var webSocket = (IWebSocketConnection) sender; RemoveWebSocket(webSocket); } /// /// Adds a WebSocket to the KeepAlive watchlist. /// /// The WebSocket to monitor. private async void KeepAliveWebSocket(IWebSocketConnection webSocket) { if (!_webSockets.TryAdd(webSocket, 0)) { _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket); return; } webSocket.Closed += OnWebSocketClosed; webSocket.LastKeepAliveDate = DateTime.UtcNow; // Notify WebSocket about timeout try { await SendForceKeepAlive(webSocket); } catch (WebSocketException exception) { _logger.LogWarning(exception, "Error sending ForceKeepAlive message to WebSocket."); } StartKeepAliveTimer(); } /// /// Removes a WebSocket from the KeepAlive watchlist. /// /// The WebSocket to remove. private void RemoveWebSocket(IWebSocketConnection webSocket) { webSocket.Closed -= OnWebSocketClosed; _webSockets.TryRemove(webSocket, out _); } /// /// Starts the KeepAlive timer. /// private void StartKeepAliveTimer() { if (_keepAliveTimer == null) { _keepAliveTimer = new Timer( KeepAliveSockets, null, TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor), TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor) ); } } /// /// Stops the KeepAlive timer. /// private void StopKeepAliveTimer() { if (_keepAliveTimer != null) { _keepAliveTimer.Dispose(); _keepAliveTimer = null; } foreach (var pair in _webSockets) { pair.Key.Closed -= OnWebSocketClosed; } } /// /// Checks status of KeepAlive of WebSockets. /// /// The state. private async void KeepAliveSockets(object state) { var inactive = _webSockets.Keys.Where(i => { var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds; return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout); }); var lost = _webSockets.Keys.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout); if (inactive.Any()) { _logger.LogDebug("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count()); } foreach (var webSocket in inactive) { try { await SendForceKeepAlive(webSocket); } catch (WebSocketException exception) { _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket."); lost.Append(webSocket); } } if (lost.Any()) { _logger.LogInformation("Lost {0} WebSockets.", lost.Count()); foreach (var webSocket in lost) { // TODO: handle session relative to the lost webSocket RemoveWebSocket(webSocket); } } if (!_webSockets.Any()) { StopKeepAliveTimer(); } } /// /// Sends a ForceKeepAlive message to a WebSocket. /// /// The WebSocket. /// Task. private Task SendForceKeepAlive(IWebSocketConnection webSocket) { return webSocket.SendAsync(new WebSocketMessage { MessageType = "ForceKeepAlive", Data = WebSocketLostTimeout }, CancellationToken.None); } } }