From 28988b056ccc8efad54905b6f10ff0b9532c7130 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Thu, 25 May 2017 09:00:14 -0400 Subject: [PATCH 1/2] update stream copying --- .../Net/NetAcceptSocket.cs | 33 +- .../Net/SocketFactory.cs | 85 ++++ Emby.Common.Implementations/Net/UdpSocket.cs | 7 +- Emby.Server.Core/HttpServerFactory.cs | 78 +--- .../IO/AsyncStreamCopier.cs | 19 +- .../HdHomerun/HdHomerunHttpStream.cs | 38 ++ .../HdHomerun/HdHomerunUdpStream.cs | 90 ++++- MediaBrowser.Controller/LiveTv/LiveStream.cs | 93 +---- MediaBrowser.Model/Net/ISocketFactory.cs | 4 + SocketHttpListener/Net/HttpConnection.cs | 375 +++++++++--------- .../Net/HttpListenerResponse.cs | 47 +++ .../Net/HttpResponseStream.Managed.cs | 5 +- 12 files changed, 468 insertions(+), 406 deletions(-) diff --git a/Emby.Common.Implementations/Net/NetAcceptSocket.cs b/Emby.Common.Implementations/Net/NetAcceptSocket.cs index 82e7e9b00..5f97fd854 100644 --- a/Emby.Common.Implementations/Net/NetAcceptSocket.cs +++ b/Emby.Common.Implementations/Net/NetAcceptSocket.cs @@ -97,7 +97,6 @@ namespace Emby.Common.Implementations.Net _acceptor.StartAccept(); } -#if NET46 public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken) { var options = TransmitFileOptions.UseDefaultWorkerThread; @@ -117,25 +116,23 @@ namespace Emby.Common.Implementations.Net var client = data.Item1; var path = data.Item2; var taskCompletion = data.Item3; - + // Complete sending the data to the remote device. - try { - client.EndSendFile(ar); - taskCompletion.TrySetResult(true); -} - catch(SocketException ex){ - _logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode); - taskCompletion.TrySetException(ex); -}catch(Exception ex){ - taskCompletion.TrySetException(ex); -} + try + { + client.EndSendFile(ar); + taskCompletion.TrySetResult(true); + } + catch (SocketException ex) + { + _logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode); + taskCompletion.TrySetException(ex); + } + catch (Exception ex) + { + taskCompletion.TrySetException(ex); + } } -#else - public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken) - { - throw new NotImplementedException(); - } -#endif public void Dispose() { diff --git a/Emby.Common.Implementations/Net/SocketFactory.cs b/Emby.Common.Implementations/Net/SocketFactory.cs index 3562a8644..0a1232a40 100644 --- a/Emby.Common.Implementations/Net/SocketFactory.cs +++ b/Emby.Common.Implementations/Net/SocketFactory.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; @@ -208,5 +209,89 @@ namespace Emby.Common.Implementations.Net throw; } } + + public Stream CreateNetworkStream(ISocket socket, bool ownsSocket) + { + var netSocket = (UdpSocket)socket; + + return new SocketStream(netSocket.Socket, ownsSocket); + } } + + public class SocketStream : Stream + { + private readonly Socket _socket; + + public SocketStream(Socket socket, bool ownsSocket) + { + _socket = socket; + } + + public override void Flush() + { + } + + public override bool CanRead + { + get { return true; } + } + public override bool CanSeek + { + get { return false; } + } + public override bool CanWrite + { + get { return true; } + } + public override long Length + { + get { throw new NotImplementedException(); } + } + public override long Position + { + get { throw new NotImplementedException(); } + set { throw new NotImplementedException(); } + } + + public override void Write(byte[] buffer, int offset, int count) + { + _socket.Send(buffer, offset, count, SocketFlags.None); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + _socket.EndSend(asyncResult); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _socket.Receive(buffer, offset, count, SocketFlags.None); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return _socket.EndReceive(asyncResult); + } + } + } diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs index 5e110e464..578610b4c 100644 --- a/Emby.Common.Implementations/Net/UdpSocket.cs +++ b/Emby.Common.Implementations/Net/UdpSocket.cs @@ -14,11 +14,16 @@ namespace Emby.Common.Implementations.Net // THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS // Be careful to check any changes compile and work for all platform projects it is shared in. - internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket + public sealed class UdpSocket : DisposableManagedObjectBase, ISocket { private Socket _Socket; private int _LocalPort; + public Socket Socket + { + get { return _Socket; } + } + private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs() { SocketFlags = SocketFlags.None diff --git a/Emby.Server.Core/HttpServerFactory.cs b/Emby.Server.Core/HttpServerFactory.cs index c30355f7a..e16cbea0e 100644 --- a/Emby.Server.Core/HttpServerFactory.cs +++ b/Emby.Server.Core/HttpServerFactory.cs @@ -83,7 +83,7 @@ namespace Emby.Server.Core { var netSocket = (NetAcceptSocket)acceptSocket; - return new WritableNetworkStream(netSocket.Socket, ownsSocket); + return new SocketStream(netSocket.Socket, ownsSocket); } public Task AuthenticateSslStreamAsServer(Stream stream, ICertificate certificate) @@ -109,80 +109,4 @@ namespace Emby.Server.Core public X509Certificate X509Certificate { get; private set; } } - - public class WritableNetworkStream : Stream - { - private readonly Socket _socket; - - public WritableNetworkStream(Socket socket, bool ownsSocket) - { - _socket = socket; - } - - public override void Flush() - { - } - - public override bool CanRead - { - get { return true; } - } - public override bool CanSeek - { - get { return false; } - } - public override bool CanWrite - { - get { return true; } - } - public override long Length - { - get { throw new NotImplementedException(); } - } - public override long Position - { - get { throw new NotImplementedException(); } - set { throw new NotImplementedException(); } - } - - public override void Write(byte[] buffer, int offset, int count) - { - _socket.Send(buffer, offset, count, SocketFlags.None); - } - - public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state); - } - - public override void EndWrite(IAsyncResult asyncResult) - { - _socket.EndSend(asyncResult); - } - - public override void SetLength(long value) - { - throw new NotImplementedException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotImplementedException(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - return _socket.Receive(buffer, offset, count, SocketFlags.None); - } - - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state); - } - - public override int EndRead(IAsyncResult asyncResult) - { - return _socket.EndReceive(asyncResult); - } - } } diff --git a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs index e7330591c..9e5ce0604 100644 --- a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs +++ b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs @@ -8,7 +8,7 @@ namespace Emby.Server.Implementations.IO public class AsyncStreamCopier : IDisposable { // size in bytes of the buffers in the buffer pool - private const int DefaultBufferSize = 4096; + private const int DefaultBufferSize = 81920; private readonly int _bufferSize; // number of buffers in the pool private const int DefaultBufferCount = 4; @@ -38,15 +38,16 @@ namespace Emby.Server.Implementations.IO // stored here for rethrow private Exception _exception; - public TaskCompletionSource TaskCompletionSource; + public TaskCompletionSource TaskCompletionSource; private long _bytesToRead; private long _totalBytesWritten; private CancellationToken _cancellationToken; + public int IndividualReadOffset = 0; public AsyncStreamCopier(Stream source, Stream target, - long bytesToRead, - CancellationToken cancellationToken, + long bytesToRead, + CancellationToken cancellationToken, bool closeStreamsOnEnd = false, int bufferSize = DefaultBufferSize, int bufferCount = DefaultBufferCount) @@ -77,15 +78,15 @@ namespace Emby.Server.Implementations.IO ThrowExceptionIfNeeded(); } - public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) + public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) { return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken); } - public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) + public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) { var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount); - var taskCompletion = new TaskCompletionSource(); + var taskCompletion = new TaskCompletionSource(); copier.TaskCompletionSource = taskCompletion; @@ -109,7 +110,7 @@ namespace Emby.Server.Implementations.IO try { copier.EndCopy(result); - taskCompletion.TrySetResult(true); + taskCompletion.TrySetResult(copier._totalBytesWritten); } catch (Exception ex) { @@ -238,7 +239,7 @@ namespace Emby.Server.Implementations.IO bytesToWrite = _sizes[bufferIndex]; } - _target.BeginWrite(_buffers[bufferIndex], 0, bytesToWrite, EndWrite, null); + _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null); _totalBytesWritten += bytesToWrite; } diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs index a81a1199e..5db842dec 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs @@ -149,5 +149,43 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun { return CopyFileTo(_tempFilePath, false, stream, cancellationToken); } + + protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) + { + var eofCount = 0; + + long startPosition = -25000; + if (startPosition < 0) + { + var length = FileSystem.GetFileInfo(path).Length; + startPosition = Math.Max(length - startPosition, 0); + } + + using (var inputStream = GetInputStream(path, startPosition, true)) + { + if (startPosition > 0) + { + inputStream.Position = startPosition; + } + + while (eofCount < 20 || !allowEndOfFile) + { + var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false); + + //var position = fs.Position; + //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); + + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + } + } } } diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index 142805c37..2989177c0 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -171,24 +171,92 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun return CopyFileTo(_tempFilePath, false, stream, cancellationToken); } - private static int RtpHeaderBytes = 12; - private async Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) + protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) { - var receiveBuffer = new byte[8192]; + var eofCount = 0; - while (true) + long startPosition = -25000; + if (startPosition < 0) { - var data = await udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); - var bytesRead = data.ReceivedBytes - RtpHeaderBytes; - - await outputStream.WriteAsync(data.Buffer, RtpHeaderBytes, bytesRead, cancellationToken).ConfigureAwait(false); + var length = FileSystem.GetFileInfo(path).Length; + startPosition = Math.Max(length - startPosition, 0); + } - if (openTaskCompletionSource != null) + using (var inputStream = GetInputStream(path, startPosition, true)) + { + if (startPosition > 0) { - Resolve(openTaskCompletionSource); - openTaskCompletionSource = null; + inputStream.Position = startPosition; + } + + while (eofCount < 20 || !allowEndOfFile) + { + var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false); + + //var position = fs.Position; + //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); + + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } } } } + + private static int RtpHeaderBytes = 12; + private Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) + { + return CopyStream(_socketFactory.CreateNetworkStream(udpClient, false), outputStream, 81920, 4, openTaskCompletionSource, cancellationToken); + } + + private Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) + { + var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount); + copier.IndividualReadOffset = RtpHeaderBytes; + + var taskCompletion = new TaskCompletionSource(); + + copier.TaskCompletionSource = taskCompletion; + + var result = copier.BeginCopy(StreamCopyCallback, copier); + + if (openTaskCompletionSource != null) + { + Resolve(openTaskCompletionSource); + openTaskCompletionSource = null; + } + + if (result.CompletedSynchronously) + { + StreamCopyCallback(result); + } + + cancellationToken.Register(() => taskCompletion.TrySetCanceled()); + + return taskCompletion.Task; + } + + private void StreamCopyCallback(IAsyncResult result) + { + var copier = (AsyncStreamCopier)result.AsyncState; + var taskCompletion = copier.TaskCompletionSource; + + try + { + copier.EndCopy(result); + taskCompletion.TrySetResult(0); + } + catch (Exception ex) + { + taskCompletion.TrySetException(ex); + } + } + } } \ No newline at end of file diff --git a/MediaBrowser.Controller/LiveTv/LiveStream.cs b/MediaBrowser.Controller/LiveTv/LiveStream.cs index 48468d1a0..912fed23c 100644 --- a/MediaBrowser.Controller/LiveTv/LiveStream.cs +++ b/MediaBrowser.Controller/LiveTv/LiveStream.cs @@ -51,7 +51,7 @@ namespace MediaBrowser.Controller.LiveTv return Task.FromResult(true); } - private Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead) + protected Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead) { var fileOpenOptions = startPosition > 0 ? FileOpenOptions.RandomAccess @@ -85,96 +85,5 @@ namespace MediaBrowser.Controller.LiveTv await Task.Delay(500).ConfigureAwait(false); await DeleteTempFile(path, retryCount + 1).ConfigureAwait(false); } - - protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) - { - var eofCount = 0; - - long startPosition = -25000; - if (startPosition < 0) - { - var length = FileSystem.GetFileInfo(path).Length; - startPosition = Math.Max(length - startPosition, 0); - } - - // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 - var allowAsyncFileRead = Environment.OperatingSystem != OperatingSystem.Windows; - - using (var inputStream = GetInputStream(path, startPosition, allowAsyncFileRead)) - { - if (startPosition > 0) - { - inputStream.Position = startPosition; - } - - while (eofCount < 20 || !allowEndOfFile) - { - int bytesRead; - if (allowAsyncFileRead) - { - bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); - } - else - { - bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false); - } - - //var position = fs.Position; - //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); - - if (bytesRead == 0) - { - eofCount++; - await Task.Delay(100, cancellationToken).ConfigureAwait(false); - } - else - { - eofCount = 0; - } - } - } - } - - private async Task CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - int totalBytesRead = 0; - - while ((bytesRead = source.Read(array, 0, array.Length)) != 0) - { - var bytesToWrite = bytesRead; - - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - - totalBytesRead += bytesRead; - } - } - - return totalBytesRead; - } - - private async Task CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - int totalBytesRead = 0; - - while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) - { - var bytesToWrite = bytesRead; - - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - - totalBytesRead += bytesRead; - } - } - - return totalBytesRead; - } } } diff --git a/MediaBrowser.Model/Net/ISocketFactory.cs b/MediaBrowser.Model/Net/ISocketFactory.cs index e7dbf6cb1..bf2424660 100644 --- a/MediaBrowser.Model/Net/ISocketFactory.cs +++ b/MediaBrowser.Model/Net/ISocketFactory.cs @@ -1,4 +1,6 @@  +using System.IO; + namespace MediaBrowser.Model.Net { /// @@ -33,6 +35,8 @@ namespace MediaBrowser.Model.Net ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort); IAcceptSocket CreateSocket(IpAddressFamily family, SocketType socketType, ProtocolType protocolType, bool dualMode); + + Stream CreateNetworkStream(ISocket socket, bool ownsSocket); } public enum SocketType diff --git a/SocketHttpListener/Net/HttpConnection.cs b/SocketHttpListener/Net/HttpConnection.cs index eda633207..9c87ff076 100644 --- a/SocketHttpListener/Net/HttpConnection.cs +++ b/SocketHttpListener/Net/HttpConnection.cs @@ -14,24 +14,25 @@ namespace SocketHttpListener.Net { sealed class HttpConnection { + private static AsyncCallback s_onreadCallback = new AsyncCallback(OnRead); const int BufferSize = 8192; - IAcceptSocket sock; - Stream stream; - EndPointListener epl; - MemoryStream ms; - byte[] buffer; - HttpListenerContext context; - StringBuilder current_line; - ListenerPrefix prefix; - HttpRequestStream i_stream; - Stream o_stream; - bool chunked; - int reuses; - bool context_bound; + IAcceptSocket _socket; + Stream _stream; + EndPointListener _epl; + MemoryStream _memoryStream; + byte[] _buffer; + HttpListenerContext _context; + StringBuilder _currentLine; + ListenerPrefix _prefix; + HttpRequestStream _requestStream; + Stream _responseStream; + bool _chunked; + int _reuses; + bool _contextBound; bool secure; - int s_timeout = 300000; // 90k ms for first request, 15k ms from then on + int _timeout = 300000; // 90k ms for first request, 15k ms from then on IpEndPointInfo local_ep; - HttpListener last_listener; + HttpListener _lastListener; int[] client_cert_errors; ICertificate cert; Stream ssl_stream; @@ -44,11 +45,11 @@ namespace SocketHttpListener.Net private readonly IFileSystem _fileSystem; private readonly IEnvironmentInfo _environment; - private HttpConnection(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment) + private HttpConnection(ILogger logger, IAcceptSocket socket, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment) { _logger = logger; - this.sock = sock; - this.epl = epl; + this._socket = socket; + this._epl = epl; this.secure = secure; this.cert = cert; _cryptoProvider = cryptoProvider; @@ -63,11 +64,11 @@ namespace SocketHttpListener.Net { if (secure == false) { - stream = _streamFactory.CreateNetworkStream(sock, false); + _stream = _streamFactory.CreateNetworkStream(_socket, false); } else { - //ssl_stream = epl.Listener.CreateSslStream(new NetworkStream(sock, false), false, (t, c, ch, e) => + //ssl_stream = _epl.Listener.CreateSslStream(new NetworkStream(_socket, false), false, (t, c, ch, e) => //{ // if (c == null) // return true; @@ -78,11 +79,11 @@ namespace SocketHttpListener.Net // client_cert_errors = new int[] { (int)e }; // return true; //}); - //stream = ssl_stream.AuthenticatedStream; + //_stream = ssl_stream.AuthenticatedStream; - ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(sock, false), false); + ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(_socket, false), false); await _streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert).ConfigureAwait(false); - stream = ssl_stream; + _stream = ssl_stream; } Init(); } @@ -100,7 +101,7 @@ namespace SocketHttpListener.Net { get { - return stream; + return _stream; } } @@ -111,32 +112,26 @@ namespace SocketHttpListener.Net void Init() { - if (ssl_stream != null) - { - //ssl_stream.AuthenticateAsServer(client_cert, true, (SslProtocols)ServicePointManager.SecurityProtocol, false); - //_streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert); - } - - context_bound = false; - i_stream = null; - o_stream = null; - prefix = null; - chunked = false; - ms = _memoryStreamFactory.CreateNew(); - position = 0; - input_state = InputState.RequestLine; - line_state = LineState.None; - context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem); + _contextBound = false; + _requestStream = null; + _responseStream = null; + _prefix = null; + _chunked = false; + _memoryStream = new MemoryStream(); + _position = 0; + _inputState = InputState.RequestLine; + _lineState = LineState.None; + _context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem); } public bool IsClosed { - get { return (sock == null); } + get { return (_socket == null); } } public int Reuses { - get { return reuses; } + get { return _reuses; } } public IpEndPointInfo LocalEndPoint @@ -146,14 +141,14 @@ namespace SocketHttpListener.Net if (local_ep != null) return local_ep; - local_ep = (IpEndPointInfo)sock.LocalEndPoint; + local_ep = (IpEndPointInfo)_socket.LocalEndPoint; return local_ep; } } public IpEndPointInfo RemoteEndPoint { - get { return (IpEndPointInfo)sock.RemoteEndPoint; } + get { return (IpEndPointInfo)_socket.RemoteEndPoint; } } public bool IsSecure @@ -163,187 +158,186 @@ namespace SocketHttpListener.Net public ListenerPrefix Prefix { - get { return prefix; } - set { prefix = value; } + get { return _prefix; } + set { _prefix = value; } } - public async Task BeginReadRequest() + public void BeginReadRequest() { - if (buffer == null) - buffer = new byte[BufferSize]; - + if (_buffer == null) + _buffer = new byte[BufferSize]; try { - //if (reuses == 1) - // s_timeout = 15000; - var nRead = await stream.ReadAsync(buffer, 0, BufferSize).ConfigureAwait(false); - - OnReadInternal(nRead); + if (_reuses == 1) + _timeout = 15000; + //_timer.Change(_timeout, Timeout.Infinite); + _stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this); } - catch (Exception ex) + catch { - OnReadInternalException(ms, ex); + //_timer.Change(Timeout.Infinite, Timeout.Infinite); + CloseSocket(); + Unbind(); } } public HttpRequestStream GetRequestStream(bool chunked, long contentlength) { - if (i_stream == null) + if (_requestStream == null) { - byte[] buffer; - _memoryStreamFactory.TryGetBuffer(ms, out buffer); - - int length = (int)ms.Length; - ms = null; + byte[] buffer = _memoryStream.GetBuffer(); + int length = (int)_memoryStream.Length; + _memoryStream = null; if (chunked) { - this.chunked = true; - //context.Response.SendChunked = true; - i_stream = new ChunkedInputStream(context, stream, buffer, position, length - position); + _chunked = true; + //_context.Response.SendChunked = true; + _requestStream = new ChunkedInputStream(_context, _stream, buffer, _position, length - _position); } else { - i_stream = new HttpRequestStream(stream, buffer, position, length - position, contentlength); + _requestStream = new HttpRequestStream(_stream, buffer, _position, length - _position, contentlength); } } - return i_stream; + return _requestStream; } public Stream GetResponseStream(bool isExpect100Continue = false) { - // TODO: can we get this stream before reading the input? - if (o_stream == null) + // TODO: can we get this _stream before reading the input? + if (_responseStream == null) { - //context.Response.DetermineIfChunked(); + var supportsDirectSocketAccess = !_context.Response.SendChunked && !isExpect100Continue && !secure; - var supportsDirectSocketAccess = !context.Response.SendChunked && !isExpect100Continue && !secure; - - //o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding, _fileSystem, sock, supportsDirectSocketAccess, _logger, _environment); - - o_stream = new HttpResponseStream(stream, context.Response, false, _memoryStreamFactory, sock, supportsDirectSocketAccess, _environment, _fileSystem, _logger); + _responseStream = new HttpResponseStream(_stream, _context.Response, false, _memoryStreamFactory, _socket, supportsDirectSocketAccess, _environment, _fileSystem, _logger); } - return o_stream; + return _responseStream; } - void OnReadInternal(int nread) + private static void OnRead(IAsyncResult ares) { - ms.Write(buffer, 0, nread); - if (ms.Length > 32768) + HttpConnection cnc = (HttpConnection)ares.AsyncState; + cnc.OnReadInternal(ares); + } + + private void OnReadInternal(IAsyncResult ares) + { + //_timer.Change(Timeout.Infinite, Timeout.Infinite); + int nread = -1; + try { - SendError("Bad request", 400); - Close(true); + nread = _stream.EndRead(ares); + _memoryStream.Write(_buffer, 0, nread); + if (_memoryStream.Length > 32768) + { + SendError("Bad Request", 400); + Close(true); + return; + } + } + catch + { + if (_memoryStream != null && _memoryStream.Length > 0) + SendError(); + if (_socket != null) + { + CloseSocket(); + Unbind(); + } return; } if (nread == 0) { - //if (ms.Length > 0) - // SendError (); // Why bother? CloseSocket(); Unbind(); return; } - if (ProcessInput(ms)) + if (ProcessInput(_memoryStream)) { - if (!context.HaveError) - context.Request.FinishInitialization(); + if (!_context.HaveError) + _context.Request.FinishInitialization(); - if (context.HaveError) + if (_context.HaveError) { SendError(); Close(true); return; } - if (!epl.BindContext(context)) + if (!_epl.BindContext(_context)) { SendError("Invalid host", 400); Close(true); return; } - HttpListener listener = epl.Listener; - if (last_listener != listener) + HttpListener listener = _epl.Listener; + if (_lastListener != listener) { RemoveConnection(); listener.AddConnection(this); - last_listener = listener; + _lastListener = listener; } - context_bound = true; - listener.RegisterContext(context); + _contextBound = true; + listener.RegisterContext(_context); return; } - - BeginReadRequest(); + _stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this); } - private void OnReadInternalException(MemoryStream ms, Exception ex) + private void RemoveConnection() { - //_logger.ErrorException("Error in HttpConnection.OnReadInternal", ex); - - if (ms != null && ms.Length > 0) - SendError(); - if (sock != null) - { - CloseSocket(); - Unbind(); - } - } - - void RemoveConnection() - { - if (last_listener == null) - epl.RemoveConnection(this); + if (_lastListener == null) + _epl.RemoveConnection(this); else - last_listener.RemoveConnection(this); + _lastListener.RemoveConnection(this); } - enum InputState + private enum InputState { RequestLine, Headers } - enum LineState + private enum LineState { None, CR, LF } - InputState input_state = InputState.RequestLine; - LineState line_state = LineState.None; - int position; + InputState _inputState = InputState.RequestLine; + LineState _lineState = LineState.None; + int _position; // true -> done processing // false -> need more input - bool ProcessInput(MemoryStream ms) + private bool ProcessInput(MemoryStream ms) { - byte[] buffer; - _memoryStreamFactory.TryGetBuffer(ms, out buffer); - + byte[] buffer = ms.GetBuffer(); int len = (int)ms.Length; int used = 0; string line; while (true) { - if (context.HaveError) + if (_context.HaveError) return true; - if (position >= len) + if (_position >= len) break; try { - line = ReadLine(buffer, position, len - position, ref used); - position += used; + line = ReadLine(buffer, _position, len - _position, ref used); + _position += used; } catch { - context.ErrorMessage = "Bad request"; - context.ErrorStatus = 400; + _context.ErrorMessage = "Bad request"; + _context.ErrorStatus = 400; return true; } @@ -352,28 +346,28 @@ namespace SocketHttpListener.Net if (line == "") { - if (input_state == InputState.RequestLine) + if (_inputState == InputState.RequestLine) continue; - current_line = null; + _currentLine = null; ms = null; return true; } - if (input_state == InputState.RequestLine) + if (_inputState == InputState.RequestLine) { - context.Request.SetRequestLine(line); - input_state = InputState.Headers; + _context.Request.SetRequestLine(line); + _inputState = InputState.Headers; } else { try { - context.Request.AddHeader(line); + _context.Request.AddHeader(line); } catch (Exception e) { - context.ErrorMessage = e.Message; - context.ErrorStatus = 400; + _context.ErrorMessage = e.Message; + _context.ErrorStatus = 400; return true; } } @@ -382,42 +376,41 @@ namespace SocketHttpListener.Net if (used == len) { ms.SetLength(0); - position = 0; + _position = 0; } return false; } - string ReadLine(byte[] buffer, int offset, int len, ref int used) + private string ReadLine(byte[] buffer, int offset, int len, ref int used) { - if (current_line == null) - current_line = new StringBuilder(128); + if (_currentLine == null) + _currentLine = new StringBuilder(128); int last = offset + len; used = 0; - - for (int i = offset; i < last && line_state != LineState.LF; i++) + for (int i = offset; i < last && _lineState != LineState.LF; i++) { used++; byte b = buffer[i]; if (b == 13) { - line_state = LineState.CR; + _lineState = LineState.CR; } else if (b == 10) { - line_state = LineState.LF; + _lineState = LineState.LF; } else { - current_line.Append((char)b); + _currentLine.Append((char)b); } } string result = null; - if (line_state == LineState.LF) + if (_lineState == LineState.LF) { - line_state = LineState.None; - result = current_line.ToString(); - current_line.Length = 0; + _lineState = LineState.None; + result = _currentLine.ToString(); + _currentLine.Length = 0; } return result; @@ -427,20 +420,18 @@ namespace SocketHttpListener.Net { try { - HttpListenerResponse response = context.Response; + HttpListenerResponse response = _context.Response; response.StatusCode = status; response.ContentType = "text/html"; string description = HttpListenerResponse.GetStatusDescription(status); string str; if (msg != null) - str = String.Format("

{0} ({1})

", description, msg); + str = string.Format("

{0} ({1})

", description, msg); else - str = String.Format("

{0}

", description); + str = string.Format("

{0}

", description); - byte[] error = context.Response.ContentEncoding.GetBytes(str); - response.ContentLength64 = error.Length; - response.OutputStream.Write(error, 0, (int)error.Length); - response.Close(); + byte[] error = Encoding.Default.GetBytes(str); + response.Close(error, false); } catch { @@ -450,15 +441,15 @@ namespace SocketHttpListener.Net public void SendError() { - SendError(context.ErrorMessage, context.ErrorStatus); + SendError(_context.ErrorMessage, _context.ErrorStatus); } - void Unbind() + private void Unbind() { - if (context_bound) + if (_contextBound) { - epl.UnbindContext(context); - context_bound = false; + _epl.UnbindContext(_context); + _contextBound = false; } } @@ -469,64 +460,60 @@ namespace SocketHttpListener.Net private void CloseSocket() { - if (sock == null) + if (_socket == null) return; try { - sock.Close(); - } - catch - { + _socket.Close(); } + catch { } finally { - sock = null; + _socket = null; } + RemoveConnection(); } - internal void Close(bool force_close) + internal void Close(bool force) { - if (sock != null) + if (_socket != null) { - if (!context.Request.IsWebSocketRequest || force_close) - { - Stream st = GetResponseStream(); - if (st != null) - { - st.Dispose(); - } + Stream st = GetResponseStream(); + if (st != null) + st.Close(); - o_stream = null; - } + _responseStream = null; } - if (sock != null) + if (_socket != null) { - force_close |= !context.Request.KeepAlive; - if (!force_close) - force_close = (string.Equals(context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase)); - /* - if (!force_close) { -// bool conn_close = (status_code == 400 || status_code == 408 || status_code == 411 || -// status_code == 413 || status_code == 414 || status_code == 500 || -// status_code == 503); - force_close |= (context.Request.ProtocolVersion <= HttpVersion.Version10); - } - */ + force |= !_context.Request.KeepAlive; + if (!force) + force = (string.Equals(_context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase)); - if (!force_close && context.Request.FlushInput()) + if (!force && _context.Request.FlushInput()) { - reuses++; + if (_chunked && _context.Response.ForceCloseChunked == false) + { + // Don't close. Keep working. + _reuses++; + Unbind(); + Init(); + BeginReadRequest(); + return; + } + + _reuses++; Unbind(); Init(); BeginReadRequest(); return; } - IAcceptSocket s = sock; - sock = null; + IAcceptSocket s = _socket; + _socket = null; try { if (s != null) diff --git a/SocketHttpListener/Net/HttpListenerResponse.cs b/SocketHttpListener/Net/HttpListenerResponse.cs index 185454ef6..da7aff081 100644 --- a/SocketHttpListener/Net/HttpListenerResponse.cs +++ b/SocketHttpListener/Net/HttpListenerResponse.cs @@ -53,6 +53,11 @@ namespace SocketHttpListener.Net } } + public bool ForceCloseChunked + { + get { return false; } + } + public Encoding ContentEncoding { get @@ -335,6 +340,48 @@ namespace SocketHttpListener.Net context.Connection.Close(force); } + public void Close(byte[] responseEntity, bool willBlock) + { + //CheckDisposed(); + + if (responseEntity == null) + { + throw new ArgumentNullException(nameof(responseEntity)); + } + + //if (_boundaryType != BoundaryType.Chunked) + { + ContentLength64 = responseEntity.Length; + } + + if (willBlock) + { + try + { + OutputStream.Write(responseEntity, 0, responseEntity.Length); + } + finally + { + Close(false); + } + } + else + { + OutputStream.BeginWrite(responseEntity, 0, responseEntity.Length, iar => + { + var thisRef = (HttpListenerResponse)iar.AsyncState; + try + { + thisRef.OutputStream.EndWrite(iar); + } + finally + { + thisRef.Close(false); + } + }, this); + } + } + public void Close() { if (disposed) diff --git a/SocketHttpListener/Net/HttpResponseStream.Managed.cs b/SocketHttpListener/Net/HttpResponseStream.Managed.cs index 73c296580..d6bb2c04a 100644 --- a/SocketHttpListener/Net/HttpResponseStream.Managed.cs +++ b/SocketHttpListener/Net/HttpResponseStream.Managed.cs @@ -325,10 +325,7 @@ namespace SocketHttpListener.Net } } - private bool EnableSendFileWithSocket - { - get { return false; } - } + private bool EnableSendFileWithSocket = false; public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) { From b527e56ec3b7d644245b2a2b6bd39c1e2a375c83 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Thu, 25 May 2017 09:00:33 -0400 Subject: [PATCH 2/2] 3.2.17.15 --- SharedVersion.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SharedVersion.cs b/SharedVersion.cs index 54c968533..681eb5751 100644 --- a/SharedVersion.cs +++ b/SharedVersion.cs @@ -1,3 +1,3 @@ using System.Reflection; -[assembly: AssemblyVersion("3.2.17.14")] +[assembly: AssemblyVersion("3.2.17.15")]