From 2e91d69d20e49f971d9890674d3016351ee87ccd Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Fri, 15 Jul 2016 13:13:55 -0400 Subject: [PATCH] update async stream writing --- .../BaseProgressiveStreamingService.cs | 11 +- .../Progressive/ProgressiveStreamWriter.cs | 12 +- .../Net/IHttpResultFactory.cs | 2 + .../HttpServer/AsyncStreamWriterFunc.cs | 56 ++++ .../HttpServer/HttpResultFactory.cs | 5 + .../NetListener/HttpListenerServer.cs | 285 ------------------ .../HttpServer/RangeRequestWriter.cs | 74 ++++- .../HttpServer/StreamWriter.cs | 2 +- ...MediaBrowser.Server.Implementations.csproj | 5 +- 9 files changed, 141 insertions(+), 311 deletions(-) create mode 100644 MediaBrowser.Server.Implementations/HttpServer/AsyncStreamWriterFunc.cs delete mode 100644 MediaBrowser.Server.Implementations/HttpServer/NetListener/HttpListenerServer.cs diff --git a/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs b/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs index 868d8d488..d8b7ce2ef 100644 --- a/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs +++ b/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs @@ -13,6 +13,7 @@ using ServiceStack.Web; using System; using System.Collections.Generic; using System.Globalization; +using System.IO; using System.Threading; using System.Threading.Tasks; using CommonIO; @@ -336,17 +337,19 @@ namespace MediaBrowser.Api.Playback.Progressive state.Dispose(); } - var result = new ProgressiveStreamWriter(outputPath, Logger, FileSystem, job); + var outputHeaders = new Dictionary(StringComparer.OrdinalIgnoreCase); - result.Options["Content-Type"] = contentType; + outputHeaders["Content-Type"] = contentType; // Add the response headers to the result object foreach (var item in responseHeaders) { - result.Options[item.Key] = item.Value; + outputHeaders[item.Key] = item.Value; } - return result; + Func streamWriter = stream => new ProgressiveFileCopier(FileSystem, job, Logger).StreamFile(outputPath, stream); + + return ResultFactory.GetAsyncStreamWriter(streamWriter, outputHeaders); } finally { diff --git a/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs b/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs index 9f02c51cd..4c9428cc4 100644 --- a/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs +++ b/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs @@ -48,21 +48,19 @@ namespace MediaBrowser.Api.Playback.Progressive /// The response stream. public void WriteTo(Stream responseStream) { - WriteToInternal(responseStream); + var task = WriteToAsync(responseStream); + Task.WaitAll(task); } /// - /// Writes to async. + /// Writes to. /// /// The response stream. - /// Task. - private void WriteToInternal(Stream responseStream) + public async Task WriteToAsync(Stream responseStream) { try { - var task = new ProgressiveFileCopier(_fileSystem, _job, Logger).StreamFile(Path, responseStream); - - Task.WaitAll(task); + await new ProgressiveFileCopier(_fileSystem, _job, Logger).StreamFile(Path, responseStream).ConfigureAwait(false); } catch (IOException) { diff --git a/MediaBrowser.Controller/Net/IHttpResultFactory.cs b/MediaBrowser.Controller/Net/IHttpResultFactory.cs index cd7ee603e..49d4614d8 100644 --- a/MediaBrowser.Controller/Net/IHttpResultFactory.cs +++ b/MediaBrowser.Controller/Net/IHttpResultFactory.cs @@ -28,6 +28,8 @@ namespace MediaBrowser.Controller.Net /// System.Object. object GetResult(object content, string contentType, IDictionary responseHeaders = null); + object GetAsyncStreamWriter(Func streamWriter, IDictionary responseHeaders = null); + /// /// Gets the optimized result. /// diff --git a/MediaBrowser.Server.Implementations/HttpServer/AsyncStreamWriterFunc.cs b/MediaBrowser.Server.Implementations/HttpServer/AsyncStreamWriterFunc.cs new file mode 100644 index 000000000..4f8b18319 --- /dev/null +++ b/MediaBrowser.Server.Implementations/HttpServer/AsyncStreamWriterFunc.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using ServiceStack; +using ServiceStack.Web; + +namespace MediaBrowser.Server.Implementations.HttpServer +{ + public class AsyncStreamWriterFunc : IStreamWriter, IAsyncStreamWriter, IHasOptions + { + /// + /// Gets or sets the source stream. + /// + /// The source stream. + private Func Writer { get; set; } + + /// + /// Gets the options. + /// + /// The options. + public IDictionary Options { get; } + + public Action OnComplete { get; set; } + public Action OnError { get; set; } + + /// + /// Initializes a new instance of the class. + /// + public AsyncStreamWriterFunc(Func writer, IDictionary headers) + { + Writer = writer; + + if (headers == null) + { + headers = new Dictionary(StringComparer.OrdinalIgnoreCase); + } + Options = headers; + } + + /// + /// Writes to. + /// + /// The response stream. + public void WriteTo(Stream responseStream) + { + var task = Writer(responseStream); + Task.WaitAll(task); + } + + public async Task WriteToAsync(Stream responseStream) + { + await Writer(responseStream).ConfigureAwait(false); + } + } +} diff --git a/MediaBrowser.Server.Implementations/HttpServer/HttpResultFactory.cs b/MediaBrowser.Server.Implementations/HttpServer/HttpResultFactory.cs index 1d4829260..c520e43b8 100644 --- a/MediaBrowser.Server.Implementations/HttpServer/HttpResultFactory.cs +++ b/MediaBrowser.Server.Implementations/HttpServer/HttpResultFactory.cs @@ -699,5 +699,10 @@ namespace MediaBrowser.Server.Implementations.HttpServer throw error; } + + public object GetAsyncStreamWriter(Func streamWriter, IDictionary responseHeaders = null) + { + return new AsyncStreamWriterFunc(streamWriter, responseHeaders); + } } } \ No newline at end of file diff --git a/MediaBrowser.Server.Implementations/HttpServer/NetListener/HttpListenerServer.cs b/MediaBrowser.Server.Implementations/HttpServer/NetListener/HttpListenerServer.cs deleted file mode 100644 index 31c0e87b3..000000000 --- a/MediaBrowser.Server.Implementations/HttpServer/NetListener/HttpListenerServer.cs +++ /dev/null @@ -1,285 +0,0 @@ -using MediaBrowser.Controller.Net; -using MediaBrowser.Model.Logging; -using ServiceStack; -using ServiceStack.Host.HttpListener; -using ServiceStack.Web; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace MediaBrowser.Server.Implementations.HttpServer.NetListener -{ - public class HttpListenerServer : IHttpListener - { - private readonly ILogger _logger; - private HttpListener _listener; - private readonly ManualResetEventSlim _listenForNextRequest = new ManualResetEventSlim(false); - - public Action ErrorHandler { get; set; } - public Action WebSocketHandler { get; set; } - public Func RequestHandler { get; set; } - - private readonly Action _endpointListener; - - public HttpListenerServer(ILogger logger, Action endpointListener) - { - _logger = logger; - _endpointListener = endpointListener; - } - - private List UrlPrefixes { get; set; } - - public void Start(IEnumerable urlPrefixes) - { - UrlPrefixes = urlPrefixes.ToList(); - - if (_listener == null) - _listener = new HttpListener(); - - //HostContext.Config.HandlerFactoryPath = ListenerRequest.GetHandlerPathIfAny(UrlPrefixes.First()); - - foreach (var prefix in UrlPrefixes) - { - _logger.Info("Adding HttpListener prefix " + prefix); - _listener.Prefixes.Add(prefix); - } - - _listener.Start(); - - Task.Factory.StartNew(Listen, TaskCreationOptions.LongRunning); - } - - private bool IsListening - { - get { return _listener != null && _listener.IsListening; } - } - - // Loop here to begin processing of new requests. - private void Listen() - { - while (IsListening) - { - if (_listener == null) return; - _listenForNextRequest.Reset(); - - try - { - _listener.BeginGetContext(ListenerCallback, _listener); - _listenForNextRequest.Wait(); - } - catch (Exception ex) - { - _logger.Error("Listen()", ex); - return; - } - if (_listener == null) return; - } - } - - // Handle the processing of a request in here. - private void ListenerCallback(IAsyncResult asyncResult) - { - _listenForNextRequest.Set(); - - var listener = asyncResult.AsyncState as HttpListener; - HttpListenerContext context; - - if (listener == null) return; - var isListening = listener.IsListening; - - try - { - if (!isListening) - { - _logger.Debug("Ignoring ListenerCallback() as HttpListener is no longer listening"); return; - } - // The EndGetContext() method, as with all Begin/End asynchronous methods in the .NET Framework, - // blocks until there is a request to be processed or some type of data is available. - context = listener.EndGetContext(asyncResult); - } - catch (Exception ex) - { - // You will get an exception when httpListener.Stop() is called - // because there will be a thread stopped waiting on the .EndGetContext() - // method, and again, that is just the way most Begin/End asynchronous - // methods of the .NET Framework work. - var errMsg = ex + ": " + IsListening; - _logger.Warn(errMsg); - return; - } - - Task.Factory.StartNew(() => InitTask(context)); - } - - private void InitTask(HttpListenerContext context) - { - try - { - var task = this.ProcessRequestAsync(context); - task.ContinueWith(x => HandleError(x.Exception, context), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); - - if (task.Status == TaskStatus.Created) - { - task.RunSynchronously(); - } - } - catch (Exception ex) - { - HandleError(ex, context); - } - } - - private Task ProcessRequestAsync(HttpListenerContext context) - { - var request = context.Request; - - LogHttpRequest(request); - - if (request.IsWebSocketRequest) - { - return ProcessWebSocketRequest(context); - } - - if (string.IsNullOrEmpty(context.Request.RawUrl)) - return ((object)null).AsTaskResult(); - - var operationName = context.Request.GetOperationName(); - - var httpReq = GetRequest(context, operationName); - - return RequestHandler(httpReq, request.Url); - } - - /// - /// Processes the web socket request. - /// - /// The CTX. - /// Task. - private async Task ProcessWebSocketRequest(HttpListenerContext ctx) - { -#if !__MonoCS__ - try - { - var webSocketContext = await ctx.AcceptWebSocketAsync(null).ConfigureAwait(false); - - if (WebSocketHandler != null) - { - WebSocketHandler(new WebSocketConnectEventArgs - { - WebSocket = new NativeWebSocket(webSocketContext.WebSocket, _logger), - Endpoint = ctx.Request.RemoteEndPoint.ToString() - }); - } - } - catch (Exception ex) - { - _logger.ErrorException("AcceptWebSocketAsync error", ex); - ctx.Response.StatusCode = 500; - ctx.Response.Close(); - } -#endif - } - - private void HandleError(Exception ex, HttpListenerContext context) - { - var operationName = context.Request.GetOperationName(); - var httpReq = GetRequest(context, operationName); - - if (ErrorHandler != null) - { - ErrorHandler(ex, httpReq); - } - } - - private static ListenerRequest GetRequest(HttpListenerContext httpContext, string operationName) - { - var req = new ListenerRequest(httpContext, operationName, RequestAttributes.None); - req.RequestAttributes = req.GetAttributes(); - - return req; - } - - /// - /// Logs the HTTP request. - /// - /// The request. - private void LogHttpRequest(HttpListenerRequest request) - { - var endpoint = request.LocalEndPoint; - - if (endpoint != null) - { - var address = endpoint.ToString(); - - _endpointListener(address); - } - - LogRequest(_logger, request); - } - - /// - /// Logs the request. - /// - /// The logger. - /// The request. - private static void LogRequest(ILogger logger, HttpListenerRequest request) - { - var log = new StringBuilder(); - - var logHeaders = true; - - if (logHeaders) - { - var headers = string.Join(",", request.Headers.AllKeys.Where(i => !string.Equals(i, "cookie", StringComparison.OrdinalIgnoreCase) && !string.Equals(i, "Referer", StringComparison.OrdinalIgnoreCase)).Select(k => k + "=" + request.Headers[k])); - - log.AppendLine("Ip: " + request.RemoteEndPoint + ". Headers: " + headers); - } - - var type = request.IsWebSocketRequest ? "Web Socket" : "HTTP " + request.HttpMethod; - - logger.LogMultiline(type + " " + request.Url, LogSeverity.Debug, log); - } - - public void Stop() - { - if (_listener != null) - { - foreach (var prefix in UrlPrefixes) - { - _listener.Prefixes.Remove(prefix); - } - - _listener.Close(); - } - } - - public void Dispose() - { - Dispose(true); - } - - private bool _disposed; - private readonly object _disposeLock = new object(); - protected virtual void Dispose(bool disposing) - { - if (_disposed) return; - - lock (_disposeLock) - { - if (_disposed) return; - - if (disposing) - { - Stop(); - } - - //release unmanaged resources here... - _disposed = true; - } - } - } -} \ No newline at end of file diff --git a/MediaBrowser.Server.Implementations/HttpServer/RangeRequestWriter.cs b/MediaBrowser.Server.Implementations/HttpServer/RangeRequestWriter.cs index fb4397462..7ac92408b 100644 --- a/MediaBrowser.Server.Implementations/HttpServer/RangeRequestWriter.cs +++ b/MediaBrowser.Server.Implementations/HttpServer/RangeRequestWriter.cs @@ -5,10 +5,12 @@ using System.Collections.Generic; using System.Globalization; using System.IO; using System.Net; +using System.Threading.Tasks; +using ServiceStack; namespace MediaBrowser.Server.Implementations.HttpServer { - public class RangeRequestWriter : IStreamWriter, IHttpResult + public class RangeRequestWriter : IStreamWriter, IAsyncStreamWriter, IHttpResult { /// /// Gets or sets the source stream. @@ -168,16 +170,6 @@ namespace MediaBrowser.Server.Implementations.HttpServer /// /// The response stream. public void WriteTo(Stream responseStream) - { - WriteToInternal(responseStream); - } - - /// - /// Writes to async. - /// - /// The response stream. - /// Task. - private void WriteToInternal(Stream responseStream) { try { @@ -237,6 +229,66 @@ namespace MediaBrowser.Server.Implementations.HttpServer } } + public async Task WriteToAsync(Stream responseStream) + { + try + { + // Headers only + if (IsHeadRequest) + { + return; + } + + using (var source = SourceStream) + { + // If the requested range is "0-", we can optimize by just doing a stream copy + if (RangeEnd >= TotalContentLength - 1) + { + await source.CopyToAsync(responseStream, BufferSize).ConfigureAwait(false); + } + else + { + await CopyToInternalAsync(source, responseStream, RangeLength).ConfigureAwait(false); + } + } + } + catch (IOException ex) + { + throw; + } + catch (Exception ex) + { + _logger.ErrorException("Error in range request writer", ex); + throw; + } + finally + { + if (OnComplete != null) + { + OnComplete(); + } + } + } + + private async Task CopyToInternalAsync(Stream source, Stream destination, long copyLength) + { + var array = new byte[BufferSize]; + int count; + while ((count = await source.ReadAsync(array, 0, array.Length).ConfigureAwait(false)) != 0) + { + var bytesToCopy = Math.Min(count, copyLength); + + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToCopy)).ConfigureAwait(false); + + copyLength -= bytesToCopy; + + if (copyLength <= 0) + { + break; + } + } + } + public string ContentType { get; set; } public IRequest RequestContext { get; set; } diff --git a/MediaBrowser.Server.Implementations/HttpServer/StreamWriter.cs b/MediaBrowser.Server.Implementations/HttpServer/StreamWriter.cs index e38e39322..f5906f6b7 100644 --- a/MediaBrowser.Server.Implementations/HttpServer/StreamWriter.cs +++ b/MediaBrowser.Server.Implementations/HttpServer/StreamWriter.cs @@ -12,7 +12,7 @@ namespace MediaBrowser.Server.Implementations.HttpServer /// /// Class StreamWriter /// - public class StreamWriter : IStreamWriter, /*IAsyncStreamWriter,*/ IHasOptions + public class StreamWriter : IStreamWriter, IAsyncStreamWriter, IHasOptions { private ILogger Logger { get; set; } diff --git a/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj b/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj index 8a3f6616a..5877059d7 100644 --- a/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj +++ b/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj @@ -156,6 +156,7 @@ + @@ -757,9 +758,7 @@ - - - +