reduce byte conversions with alchemy web socket

This commit is contained in:
Luke Pulverenti 2013-09-05 17:34:46 -04:00
parent 44b12c0f9f
commit 98e7eeeff9
9 changed files with 175 additions and 35 deletions

View File

@ -1,7 +1,7 @@
using System; using MediaBrowser.Model.Net;
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Model.Net;
namespace MediaBrowser.Common.Net namespace MediaBrowser.Common.Net
{ {
@ -20,8 +20,14 @@ namespace MediaBrowser.Common.Net
/// Gets or sets the receive action. /// Gets or sets the receive action.
/// </summary> /// </summary>
/// <value>The receive action.</value> /// <value>The receive action.</value>
Action<byte[]> OnReceiveDelegate { get; set; } Action<byte[]> OnReceiveBytes { get; set; }
/// <summary>
/// Gets or sets the on receive.
/// </summary>
/// <value>The on receive.</value>
Action<string> OnReceive { get; set; }
/// <summary> /// <summary>
/// Sends the async. /// Sends the async.
/// </summary> /// </summary>

View File

@ -151,15 +151,14 @@ namespace MediaBrowser.Providers.Movies
await _tmdbSettingsSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); await _tmdbSettingsSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Check again in case it got populated while we were waiting.
if (_tmdbSettings != null)
{
_tmdbSettingsSemaphore.Release();
return _tmdbSettings;
}
try try
{ {
// Check again in case it got populated while we were waiting.
if (_tmdbSettings != null)
{
return _tmdbSettings;
}
using (var json = await GetMovieDbResponse(new HttpRequestOptions using (var json = await GetMovieDbResponse(new HttpRequestOptions
{ {
Url = string.Format(TmdbConfigUrl, ApiKey), Url = string.Format(TmdbConfigUrl, ApiKey),

View File

@ -88,9 +88,9 @@ namespace MediaBrowser.Server.Implementations.HttpServer
break; break;
} }
if (OnReceiveDelegate != null) if (OnReceiveBytes != null)
{ {
OnReceiveDelegate(bytes); OnReceiveBytes(bytes);
} }
} }
} }
@ -160,6 +160,8 @@ namespace MediaBrowser.Server.Implementations.HttpServer
/// Gets or sets the receive action. /// Gets or sets the receive action.
/// </summary> /// </summary>
/// <value>The receive action.</value> /// <value>The receive action.</value>
public Action<byte[]> OnReceiveDelegate { get; set; } public Action<byte[]> OnReceiveBytes { get; set; }
public Action<string> OnReceive { get; set; }
} }
} }

View File

@ -186,7 +186,10 @@ namespace MediaBrowser.Server.Implementations.ServerManager
/// <param name="e">The <see cref="WebSocketConnectEventArgs" /> instance containing the event data.</param> /// <param name="e">The <see cref="WebSocketConnectEventArgs" /> instance containing the event data.</param>
void HttpServer_WebSocketConnected(object sender, WebSocketConnectEventArgs e) void HttpServer_WebSocketConnected(object sender, WebSocketConnectEventArgs e)
{ {
var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) { OnReceive = ProcessWebSocketMessageReceived }; var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger)
{
OnReceive = ProcessWebSocketMessageReceived
};
_webSocketConnections.Add(connection); _webSocketConnections.Add(connection);
} }

View File

@ -85,7 +85,8 @@ namespace MediaBrowser.Server.Implementations.ServerManager
_jsonSerializer = jsonSerializer; _jsonSerializer = jsonSerializer;
_socket = socket; _socket = socket;
_socket.OnReceiveDelegate = OnReceiveInternal; _socket.OnReceiveBytes = OnReceiveInternal;
_socket.OnReceive = OnReceiveInternal;
RemoteEndPoint = remoteEndPoint; RemoteEndPoint = remoteEndPoint;
_logger = logger; _logger = logger;
} }
@ -127,6 +128,34 @@ namespace MediaBrowser.Server.Implementations.ServerManager
} }
} }
private void OnReceiveInternal(string message)
{
LastActivityDate = DateTime.UtcNow;
if (OnReceive == null)
{
return;
}
try
{
var stub = (WebSocketMessage<object>)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage<object>));
var info = new WebSocketMessageInfo
{
MessageType = stub.MessageType,
Data = stub.Data == null ? null : stub.Data.ToString()
};
info.Connection = this;
OnReceive(info);
}
catch (Exception ex)
{
_logger.ErrorException("Error processing web socket message", ex);
}
}
/// <summary> /// <summary>
/// Sends a message asynchronously. /// Sends a message asynchronously.
/// </summary> /// </summary>

View File

@ -90,6 +90,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket
/// </summary> /// </summary>
public void Stop() public void Stop()
{ {
if (WebSocketServer != null)
{
WebSocketServer.Stop();
}
} }
/// <summary> /// <summary>
@ -107,7 +111,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket
/// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual void Dispose(bool dispose) protected virtual void Dispose(bool dispose)
{ {
if (WebSocketServer != null)
{
WebSocketServer.Dispose();
}
} }
} }
} }

View File

@ -3,7 +3,6 @@ using MediaBrowser.Common.Net;
using MediaBrowser.Model.Logging; using MediaBrowser.Model.Logging;
using MediaBrowser.Model.Net; using MediaBrowser.Model.Net;
using System; using System;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -42,7 +41,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket
UserContext = context; UserContext = context;
context.SetOnDisconnect(OnDisconnected); context.SetOnDisconnect(OnDisconnected);
context.SetOnReceive(OnReceive); context.SetOnReceive(OnReceiveContext);
_logger.Info("Client connected from {0}", context.ClientAddress); _logger.Info("Client connected from {0}", context.ClientAddress);
} }
@ -50,7 +49,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket
/// <summary> /// <summary>
/// The _disconnected /// The _disconnected
/// </summary> /// </summary>
private bool _disconnected = false; private bool _disconnected;
/// <summary> /// <summary>
/// Gets or sets the state. /// Gets or sets the state.
/// </summary> /// </summary>
@ -73,25 +72,13 @@ namespace MediaBrowser.Server.Implementations.WebSocket
/// Called when [receive]. /// Called when [receive].
/// </summary> /// </summary>
/// <param name="context">The context.</param> /// <param name="context">The context.</param>
private void OnReceive(UserContext context) private void OnReceiveContext(UserContext context)
{ {
if (OnReceiveDelegate != null) if (OnReceive != null)
{ {
var json = context.DataFrame.ToString(); var json = context.DataFrame.ToString();
if (!string.IsNullOrWhiteSpace(json)) OnReceive(json);
{
try
{
var bytes = Encoding.UTF8.GetBytes(json);
OnReceiveDelegate(bytes);
}
catch (Exception ex)
{
_logger.ErrorException("Error processing web socket message", ex);
}
}
} }
} }
@ -128,6 +115,12 @@ namespace MediaBrowser.Server.Implementations.WebSocket
/// Gets or sets the receive action. /// Gets or sets the receive action.
/// </summary> /// </summary>
/// <value>The receive action.</value> /// <value>The receive action.</value>
public Action<byte[]> OnReceiveDelegate { get; set; } public Action<byte[]> OnReceiveBytes { get; set; }
/// <summary>
/// Gets or sets the on receive.
/// </summary>
/// <value>The on receive.</value>
public Action<string> OnReceive { get; set; }
} }
} }

View File

@ -0,0 +1,54 @@
using Fleck;
using MediaBrowser.Common.Net;
using System;
using IWebSocketServer = MediaBrowser.Common.Net.IWebSocketServer;
namespace MediaBrowser.Server.Implementations.WebSocket
{
public class FleckServer : IWebSocketServer
{
private WebSocketServer _server;
public void Start(int portNumber)
{
var server = new WebSocketServer("ws://localhost:" + portNumber);
server.Start(socket =>
{
socket.OnOpen = () => OnClientConnected(socket);
});
_server = server;
}
public void Stop()
{
_server.Dispose();
}
private void OnClientConnected(Fleck.IWebSocketConnection context)
{
if (WebSocketConnected != null)
{
var socket = new FleckWebSocket(context);
WebSocketConnected(this, new WebSocketConnectEventArgs
{
WebSocket = socket,
Endpoint = context.ConnectionInfo.ClientIpAddress + ":" + context.ConnectionInfo.ClientPort
});
}
}
public event EventHandler<WebSocketConnectEventArgs> WebSocketConnected;
public int Port
{
get { return _server.Port; }
}
public void Dispose()
{
_server.Dispose();
}
}
}

View File

@ -0,0 +1,47 @@
using MediaBrowser.Common.Net;
using MediaBrowser.Model.Net;
using System;
using System.Threading;
using System.Threading.Tasks;
using IWebSocketConnection = Fleck.IWebSocketConnection;
namespace MediaBrowser.Server.Implementations.WebSocket
{
public class FleckWebSocket : IWebSocket
{
private readonly IWebSocketConnection _connection;
public FleckWebSocket(IWebSocketConnection connection)
{
_connection = connection;
_connection.OnMessage = OnReceiveData;
}
public WebSocketState State
{
get { return _connection.IsAvailable ? WebSocketState.Open : WebSocketState.Closed; }
}
private void OnReceiveData(string data)
{
if (OnReceive != null)
{
OnReceive(data);
}
}
public Task SendAsync(byte[] bytes, WebSocketMessageType type, bool endOfMessage, CancellationToken cancellationToken)
{
return Task.Run(() => _connection.Send(bytes));
}
public void Dispose()
{
_connection.Close();
}
public Action<byte[]> OnReceiveBytes { get; set; }
public Action<string> OnReceive { get; set; }
}
}