Remove more cruft and add the beginnings of a socket middleware

This commit is contained in:
Claus Vium 2019-02-27 23:22:55 +01:00
parent 71ed840944
commit fb1de5a921
8 changed files with 97 additions and 84 deletions

View File

@ -35,6 +35,7 @@ using Emby.Server.Implementations.IO;
using Emby.Server.Implementations.Library;
using Emby.Server.Implementations.LiveTv;
using Emby.Server.Implementations.Localization;
using Emby.Server.Implementations.Middleware;
using Emby.Server.Implementations.Net;
using Emby.Server.Implementations.Playlists;
using Emby.Server.Implementations.Reflection;
@ -641,6 +642,7 @@ namespace Emby.Server.Implementations
app.UseWebSockets();
app.UseResponseCompression();
// TODO app.UseMiddleware<WebSocketMiddleware>();
app.Use(ExecuteWebsocketHandlerAsync);
app.Use(ExecuteHttpHandlerAsync);
})

View File

@ -102,12 +102,6 @@ namespace Emby.Server.Implementations.HttpServer
_socket = socket;
_socket.OnReceiveBytes = OnReceiveInternal;
var memorySocket = socket as IMemoryWebSocket;
if (memorySocket != null)
{
memorySocket.OnReceiveMemoryBytes = OnReceiveInternal;
}
RemoteEndPoint = remoteEndPoint;
_logger = logger;
@ -143,34 +137,6 @@ namespace Emby.Server.Implementations.HttpServer
}
}
/// <summary>
/// Called when [receive].
/// </summary>
/// <param name="memory">The memory block.</param>
/// <param name="length">The length of the memory block.</param>
private void OnReceiveInternal(Memory<byte> memory, int length)
{
LastActivityDate = DateTime.UtcNow;
if (OnReceive == null)
{
return;
}
var bytes = memory.Slice(0, length).ToArray();
var charset = CharsetDetector.DetectFromBytes(bytes).Detected?.EncodingName;
if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase))
{
OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length));
}
else
{
OnReceiveInternal(Encoding.ASCII.GetString(bytes, 0, bytes.Length));
}
}
private void OnReceiveInternal(string message)
{
LastActivityDate = DateTime.UtcNow;
@ -194,7 +160,7 @@ namespace Emby.Server.Implementations.HttpServer
var info = new WebSocketMessageInfo
{
MessageType = stub.MessageType,
Data = stub.Data == null ? null : stub.Data.ToString(),
Data = stub.Data?.ToString(),
Connection = this
};

View File

@ -0,0 +1,36 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using WebSocketManager = Emby.Server.Implementations.WebSockets.WebSocketManager;
namespace Emby.Server.Implementations.Middleware
{
public class WebSocketMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<WebSocketMiddleware> _logger;
private readonly WebSocketManager _webSocketManager;
public WebSocketMiddleware(RequestDelegate next, ILogger<WebSocketMiddleware> logger, WebSocketManager webSocketManager)
{
_next = next;
_logger = logger;
_webSocketManager = webSocketManager;
}
public async Task Invoke(HttpContext httpContext)
{
_logger.LogInformation("Handling request: " + httpContext.Request.Path);
if (httpContext.WebSockets.IsWebSocketRequest)
{
var webSocketContext = await httpContext.WebSockets.AcceptWebSocketAsync(null).ConfigureAwait(false);
_webSocketManager.AddSocket(webSocketContext);
}
else
{
await _next.Invoke(httpContext);
}
}
}
}

View File

@ -45,9 +45,4 @@ namespace Emby.Server.Implementations.Net
/// <returns>Task.</returns>
Task SendAsync(string text, bool endOfMessage, CancellationToken cancellationToken);
}
public interface IMemoryWebSocket
{
Action<Memory<byte>, int> OnReceiveMemoryBytes { get; set; }
}
}

View File

@ -33,8 +33,6 @@ using Microsoft.Extensions.Logging;
public Func<Exception, IRequest, bool, bool, Task> ErrorHandler { get; set; }
public Func<IHttpRequest, string, string, string, CancellationToken, Task> RequestHandler { get; set; }
public Action<WebSocketConnectingEventArgs> WebSocketConnecting { get; set; }
public Action<WebSocketConnectEventArgs> WebSocketConnected { get; set; }
private static void LogRequest(ILogger logger, HttpRequest request)
@ -52,60 +50,41 @@ using Microsoft.Extensions.Logging;
var endpoint = ctx.Connection.RemoteIpAddress.ToString();
var url = ctx.Request.GetDisplayUrl();
var connectingArgs = new WebSocketConnectingEventArgs
var webSocketContext = await ctx.WebSockets.AcceptWebSocketAsync(null).ConfigureAwait(false);
var socket = new SharpWebSocket(webSocketContext, _logger);
WebSocketConnected(new WebSocketConnectEventArgs
{
Url = url,
QueryString = ctx.Request.Query,
WebSocket = socket,
Endpoint = endpoint
};
});
WebSocketConnecting?.Invoke(connectingArgs);
WebSocketReceiveResult result;
var message = new List<byte>();
if (connectingArgs.AllowConnection)
do
{
_logger.LogDebug("Web socket connection allowed");
var buffer = WebSocket.CreateServerBuffer(4096);
result = await webSocketContext.ReceiveAsync(buffer, _disposeCancellationToken);
message.AddRange(buffer.Array.Take(result.Count));
var webSocketContext = await ctx.WebSockets.AcceptWebSocketAsync(null).ConfigureAwait(false);
var socket = new SharpWebSocket(webSocketContext, _logger);
WebSocketConnected(new WebSocketConnectEventArgs
if (result.EndOfMessage)
{
Url = url,
QueryString = ctx.Request.Query,
WebSocket = socket,
Endpoint = endpoint
});
WebSocketReceiveResult result;
var message = new List<byte>();
do
{
var buffer = WebSocket.CreateServerBuffer(4096);
result = await webSocketContext.ReceiveAsync(buffer, _disposeCancellationToken);
message.AddRange(buffer.Array.Take(result.Count));
if (result.EndOfMessage)
{
socket.OnReceiveBytes(message.ToArray());
message.Clear();
}
} while (socket.State == WebSocketState.Open && result.MessageType != WebSocketMessageType.Close);
if (webSocketContext.State == WebSocketState.Open)
{
await webSocketContext.CloseAsync(result.CloseStatus ?? WebSocketCloseStatus.NormalClosure,
result.CloseStatusDescription, _disposeCancellationToken);
socket.OnReceiveBytes(message.ToArray());
message.Clear();
}
} while (socket.State == WebSocketState.Open && result.MessageType != WebSocketMessageType.Close);
socket.Dispose();
}
else
if (webSocketContext.State == WebSocketState.Open)
{
_logger.LogWarning("Web socket connection not allowed");
ctx.Response.StatusCode = 401;
await webSocketContext.CloseAsync(result.CloseStatus ?? WebSocketCloseStatus.NormalClosure,
result.CloseStatusDescription, _disposeCancellationToken);
}
socket.Dispose();
}
catch (Exception ex)
{

View File

@ -0,0 +1,7 @@
namespace Emby.Server.Implementations.WebSocket
{
public class WebSocketManager
{
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Concurrent;
using System.Net.WebSockets;
namespace Emby.Server.Implementations.WebSockets
{
public class WebSocketManager
{
private readonly ConcurrentDictionary<Guid, WebSocket> _activeWebSockets;
public WebSocketManager()
{
_activeWebSockets = new ConcurrentDictionary<Guid, WebSocket>();
}
public void AddSocket(WebSocket webSocket)
{
var guid = Guid.NewGuid();
_activeWebSockets.TryAdd(guid, webSocket);
}
}
}

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Authors>Jellyfin Contributors</Authors>
@ -16,6 +16,12 @@
<Compile Include="..\SharedVersion.cs" />
</ItemGroup>
<ItemGroup>
<Reference Include="Microsoft.AspNetCore.Http.Extensions, Version=2.2.0.0, Culture=neutral, PublicKeyToken=adb9793829ddae60">
<HintPath>..\..\..\..\..\usr\local\share\dotnet\sdk\NuGetFallbackFolder\microsoft.aspnetcore.http.extensions\2.2.0\lib\netstandard2.0\Microsoft.AspNetCore.Http.Extensions.dll</HintPath>
</Reference>
</ItemGroup>
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>