Merge pull request #2664 from MediaBrowser/dev

Dev
This commit is contained in:
Luke 2017-05-25 09:00:55 -04:00 committed by GitHub
commit 40bc3b7b22
13 changed files with 469 additions and 407 deletions

View File

@ -97,7 +97,6 @@ namespace Emby.Common.Implementations.Net
_acceptor.StartAccept(); _acceptor.StartAccept();
} }
#if NET46
public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken) public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken)
{ {
var options = TransmitFileOptions.UseDefaultWorkerThread; var options = TransmitFileOptions.UseDefaultWorkerThread;
@ -117,25 +116,23 @@ namespace Emby.Common.Implementations.Net
var client = data.Item1; var client = data.Item1;
var path = data.Item2; var path = data.Item2;
var taskCompletion = data.Item3; var taskCompletion = data.Item3;
// Complete sending the data to the remote device. // Complete sending the data to the remote device.
try { try
client.EndSendFile(ar); {
taskCompletion.TrySetResult(true); client.EndSendFile(ar);
} taskCompletion.TrySetResult(true);
catch(SocketException ex){ }
_logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode); catch (SocketException ex)
taskCompletion.TrySetException(ex); {
}catch(Exception ex){ _logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode);
taskCompletion.TrySetException(ex); taskCompletion.TrySetException(ex);
} }
catch (Exception ex)
{
taskCompletion.TrySetException(ex);
}
} }
#else
public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
#endif
public void Dispose() public void Dispose()
{ {

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
@ -208,5 +209,89 @@ namespace Emby.Common.Implementations.Net
throw; throw;
} }
} }
public Stream CreateNetworkStream(ISocket socket, bool ownsSocket)
{
var netSocket = (UdpSocket)socket;
return new SocketStream(netSocket.Socket, ownsSocket);
}
} }
public class SocketStream : Stream
{
private readonly Socket _socket;
public SocketStream(Socket socket, bool ownsSocket)
{
_socket = socket;
}
public override void Flush()
{
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get { throw new NotImplementedException(); }
set { throw new NotImplementedException(); }
}
public override void Write(byte[] buffer, int offset, int count)
{
_socket.Send(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
_socket.EndSend(asyncResult);
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
return _socket.Receive(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state);
}
public override int EndRead(IAsyncResult asyncResult)
{
return _socket.EndReceive(asyncResult);
}
}
} }

View File

@ -14,11 +14,16 @@ namespace Emby.Common.Implementations.Net
// THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS // THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS
// Be careful to check any changes compile and work for all platform projects it is shared in. // Be careful to check any changes compile and work for all platform projects it is shared in.
internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket public sealed class UdpSocket : DisposableManagedObjectBase, ISocket
{ {
private Socket _Socket; private Socket _Socket;
private int _LocalPort; private int _LocalPort;
public Socket Socket
{
get { return _Socket; }
}
private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs() private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
{ {
SocketFlags = SocketFlags.None SocketFlags = SocketFlags.None

View File

@ -83,7 +83,7 @@ namespace Emby.Server.Core
{ {
var netSocket = (NetAcceptSocket)acceptSocket; var netSocket = (NetAcceptSocket)acceptSocket;
return new WritableNetworkStream(netSocket.Socket, ownsSocket); return new SocketStream(netSocket.Socket, ownsSocket);
} }
public Task AuthenticateSslStreamAsServer(Stream stream, ICertificate certificate) public Task AuthenticateSslStreamAsServer(Stream stream, ICertificate certificate)
@ -109,80 +109,4 @@ namespace Emby.Server.Core
public X509Certificate X509Certificate { get; private set; } public X509Certificate X509Certificate { get; private set; }
} }
public class WritableNetworkStream : Stream
{
private readonly Socket _socket;
public WritableNetworkStream(Socket socket, bool ownsSocket)
{
_socket = socket;
}
public override void Flush()
{
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get { throw new NotImplementedException(); }
set { throw new NotImplementedException(); }
}
public override void Write(byte[] buffer, int offset, int count)
{
_socket.Send(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
_socket.EndSend(asyncResult);
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
return _socket.Receive(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state);
}
public override int EndRead(IAsyncResult asyncResult)
{
return _socket.EndReceive(asyncResult);
}
}
} }

View File

@ -8,7 +8,7 @@ namespace Emby.Server.Implementations.IO
public class AsyncStreamCopier : IDisposable public class AsyncStreamCopier : IDisposable
{ {
// size in bytes of the buffers in the buffer pool // size in bytes of the buffers in the buffer pool
private const int DefaultBufferSize = 4096; private const int DefaultBufferSize = 81920;
private readonly int _bufferSize; private readonly int _bufferSize;
// number of buffers in the pool // number of buffers in the pool
private const int DefaultBufferCount = 4; private const int DefaultBufferCount = 4;
@ -38,15 +38,16 @@ namespace Emby.Server.Implementations.IO
// stored here for rethrow // stored here for rethrow
private Exception _exception; private Exception _exception;
public TaskCompletionSource<bool> TaskCompletionSource; public TaskCompletionSource<long> TaskCompletionSource;
private long _bytesToRead; private long _bytesToRead;
private long _totalBytesWritten; private long _totalBytesWritten;
private CancellationToken _cancellationToken; private CancellationToken _cancellationToken;
public int IndividualReadOffset = 0;
public AsyncStreamCopier(Stream source, public AsyncStreamCopier(Stream source,
Stream target, Stream target,
long bytesToRead, long bytesToRead,
CancellationToken cancellationToken, CancellationToken cancellationToken,
bool closeStreamsOnEnd = false, bool closeStreamsOnEnd = false,
int bufferSize = DefaultBufferSize, int bufferSize = DefaultBufferSize,
int bufferCount = DefaultBufferCount) int bufferCount = DefaultBufferCount)
@ -77,15 +78,15 @@ namespace Emby.Server.Implementations.IO
ThrowExceptionIfNeeded(); ThrowExceptionIfNeeded();
} }
public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{ {
return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken); return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);
} }
public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{ {
var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount); var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);
var taskCompletion = new TaskCompletionSource<bool>(); var taskCompletion = new TaskCompletionSource<long>();
copier.TaskCompletionSource = taskCompletion; copier.TaskCompletionSource = taskCompletion;
@ -109,7 +110,7 @@ namespace Emby.Server.Implementations.IO
try try
{ {
copier.EndCopy(result); copier.EndCopy(result);
taskCompletion.TrySetResult(true); taskCompletion.TrySetResult(copier._totalBytesWritten);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -238,7 +239,7 @@ namespace Emby.Server.Implementations.IO
bytesToWrite = _sizes[bufferIndex]; bytesToWrite = _sizes[bufferIndex];
} }
_target.BeginWrite(_buffers[bufferIndex], 0, bytesToWrite, EndWrite, null); _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);
_totalBytesWritten += bytesToWrite; _totalBytesWritten += bytesToWrite;
} }

View File

@ -149,5 +149,43 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{ {
return CopyFileTo(_tempFilePath, false, stream, cancellationToken); return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
} }
protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
{
var eofCount = 0;
long startPosition = -25000;
if (startPosition < 0)
{
var length = FileSystem.GetFileInfo(path).Length;
startPosition = Math.Max(length - startPosition, 0);
}
using (var inputStream = GetInputStream(path, startPosition, true))
{
if (startPosition > 0)
{
inputStream.Position = startPosition;
}
while (eofCount < 20 || !allowEndOfFile)
{
var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
eofCount++;
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
}
}
}
}
} }
} }

View File

@ -171,24 +171,92 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return CopyFileTo(_tempFilePath, false, stream, cancellationToken); return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
} }
private static int RtpHeaderBytes = 12; protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
private async Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{ {
var receiveBuffer = new byte[8192]; var eofCount = 0;
while (true) long startPosition = -25000;
if (startPosition < 0)
{ {
var data = await udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); var length = FileSystem.GetFileInfo(path).Length;
var bytesRead = data.ReceivedBytes - RtpHeaderBytes; startPosition = Math.Max(length - startPosition, 0);
}
await outputStream.WriteAsync(data.Buffer, RtpHeaderBytes, bytesRead, cancellationToken).ConfigureAwait(false);
if (openTaskCompletionSource != null) using (var inputStream = GetInputStream(path, startPosition, true))
{
if (startPosition > 0)
{ {
Resolve(openTaskCompletionSource); inputStream.Position = startPosition;
openTaskCompletionSource = null; }
while (eofCount < 20 || !allowEndOfFile)
{
var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
eofCount++;
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
}
} }
} }
} }
private static int RtpHeaderBytes = 12;
private Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{
return CopyStream(_socketFactory.CreateNetworkStream(udpClient, false), outputStream, 81920, 4, openTaskCompletionSource, cancellationToken);
}
private Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{
var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount);
copier.IndividualReadOffset = RtpHeaderBytes;
var taskCompletion = new TaskCompletionSource<long>();
copier.TaskCompletionSource = taskCompletion;
var result = copier.BeginCopy(StreamCopyCallback, copier);
if (openTaskCompletionSource != null)
{
Resolve(openTaskCompletionSource);
openTaskCompletionSource = null;
}
if (result.CompletedSynchronously)
{
StreamCopyCallback(result);
}
cancellationToken.Register(() => taskCompletion.TrySetCanceled());
return taskCompletion.Task;
}
private void StreamCopyCallback(IAsyncResult result)
{
var copier = (AsyncStreamCopier)result.AsyncState;
var taskCompletion = copier.TaskCompletionSource;
try
{
copier.EndCopy(result);
taskCompletion.TrySetResult(0);
}
catch (Exception ex)
{
taskCompletion.TrySetException(ex);
}
}
} }
} }

View File

@ -51,7 +51,7 @@ namespace MediaBrowser.Controller.LiveTv
return Task.FromResult(true); return Task.FromResult(true);
} }
private Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead) protected Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead)
{ {
var fileOpenOptions = startPosition > 0 var fileOpenOptions = startPosition > 0
? FileOpenOptions.RandomAccess ? FileOpenOptions.RandomAccess
@ -85,96 +85,5 @@ namespace MediaBrowser.Controller.LiveTv
await Task.Delay(500).ConfigureAwait(false); await Task.Delay(500).ConfigureAwait(false);
await DeleteTempFile(path, retryCount + 1).ConfigureAwait(false); await DeleteTempFile(path, retryCount + 1).ConfigureAwait(false);
} }
protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
{
var eofCount = 0;
long startPosition = -25000;
if (startPosition < 0)
{
var length = FileSystem.GetFileInfo(path).Length;
startPosition = Math.Max(length - startPosition, 0);
}
// use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
var allowAsyncFileRead = Environment.OperatingSystem != OperatingSystem.Windows;
using (var inputStream = GetInputStream(path, startPosition, allowAsyncFileRead))
{
if (startPosition > 0)
{
inputStream.Position = startPosition;
}
while (eofCount < 20 || !allowEndOfFile)
{
int bytesRead;
if (allowAsyncFileRead)
{
bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
}
else
{
bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
}
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
eofCount++;
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
}
}
}
}
private async Task<int> CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken)
{
var array = new byte[StreamCopyToBufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
{
var bytesToWrite = bytesRead;
if (bytesToWrite > 0)
{
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
}
}
return totalBytesRead;
}
private async Task<int> CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken)
{
var array = new byte[StreamCopyToBufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
var bytesToWrite = bytesRead;
if (bytesToWrite > 0)
{
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
}
}
return totalBytesRead;
}
} }
} }

View File

@ -1,4 +1,6 @@
 
using System.IO;
namespace MediaBrowser.Model.Net namespace MediaBrowser.Model.Net
{ {
/// <summary> /// <summary>
@ -33,6 +35,8 @@ namespace MediaBrowser.Model.Net
ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort); ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort);
IAcceptSocket CreateSocket(IpAddressFamily family, SocketType socketType, ProtocolType protocolType, bool dualMode); IAcceptSocket CreateSocket(IpAddressFamily family, SocketType socketType, ProtocolType protocolType, bool dualMode);
Stream CreateNetworkStream(ISocket socket, bool ownsSocket);
} }
public enum SocketType public enum SocketType

View File

@ -1,3 +1,3 @@
using System.Reflection; using System.Reflection;
[assembly: AssemblyVersion("3.2.17.14")] [assembly: AssemblyVersion("3.2.17.15")]

View File

@ -14,24 +14,25 @@ namespace SocketHttpListener.Net
{ {
sealed class HttpConnection sealed class HttpConnection
{ {
private static AsyncCallback s_onreadCallback = new AsyncCallback(OnRead);
const int BufferSize = 8192; const int BufferSize = 8192;
IAcceptSocket sock; IAcceptSocket _socket;
Stream stream; Stream _stream;
EndPointListener epl; EndPointListener _epl;
MemoryStream ms; MemoryStream _memoryStream;
byte[] buffer; byte[] _buffer;
HttpListenerContext context; HttpListenerContext _context;
StringBuilder current_line; StringBuilder _currentLine;
ListenerPrefix prefix; ListenerPrefix _prefix;
HttpRequestStream i_stream; HttpRequestStream _requestStream;
Stream o_stream; Stream _responseStream;
bool chunked; bool _chunked;
int reuses; int _reuses;
bool context_bound; bool _contextBound;
bool secure; bool secure;
int s_timeout = 300000; // 90k ms for first request, 15k ms from then on int _timeout = 300000; // 90k ms for first request, 15k ms from then on
IpEndPointInfo local_ep; IpEndPointInfo local_ep;
HttpListener last_listener; HttpListener _lastListener;
int[] client_cert_errors; int[] client_cert_errors;
ICertificate cert; ICertificate cert;
Stream ssl_stream; Stream ssl_stream;
@ -44,11 +45,11 @@ namespace SocketHttpListener.Net
private readonly IFileSystem _fileSystem; private readonly IFileSystem _fileSystem;
private readonly IEnvironmentInfo _environment; private readonly IEnvironmentInfo _environment;
private HttpConnection(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment) private HttpConnection(ILogger logger, IAcceptSocket socket, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment)
{ {
_logger = logger; _logger = logger;
this.sock = sock; this._socket = socket;
this.epl = epl; this._epl = epl;
this.secure = secure; this.secure = secure;
this.cert = cert; this.cert = cert;
_cryptoProvider = cryptoProvider; _cryptoProvider = cryptoProvider;
@ -63,11 +64,11 @@ namespace SocketHttpListener.Net
{ {
if (secure == false) if (secure == false)
{ {
stream = _streamFactory.CreateNetworkStream(sock, false); _stream = _streamFactory.CreateNetworkStream(_socket, false);
} }
else else
{ {
//ssl_stream = epl.Listener.CreateSslStream(new NetworkStream(sock, false), false, (t, c, ch, e) => //ssl_stream = _epl.Listener.CreateSslStream(new NetworkStream(_socket, false), false, (t, c, ch, e) =>
//{ //{
// if (c == null) // if (c == null)
// return true; // return true;
@ -78,11 +79,11 @@ namespace SocketHttpListener.Net
// client_cert_errors = new int[] { (int)e }; // client_cert_errors = new int[] { (int)e };
// return true; // return true;
//}); //});
//stream = ssl_stream.AuthenticatedStream; //_stream = ssl_stream.AuthenticatedStream;
ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(sock, false), false); ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(_socket, false), false);
await _streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert).ConfigureAwait(false); await _streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert).ConfigureAwait(false);
stream = ssl_stream; _stream = ssl_stream;
} }
Init(); Init();
} }
@ -100,7 +101,7 @@ namespace SocketHttpListener.Net
{ {
get get
{ {
return stream; return _stream;
} }
} }
@ -111,32 +112,26 @@ namespace SocketHttpListener.Net
void Init() void Init()
{ {
if (ssl_stream != null) _contextBound = false;
{ _requestStream = null;
//ssl_stream.AuthenticateAsServer(client_cert, true, (SslProtocols)ServicePointManager.SecurityProtocol, false); _responseStream = null;
//_streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert); _prefix = null;
} _chunked = false;
_memoryStream = new MemoryStream();
context_bound = false; _position = 0;
i_stream = null; _inputState = InputState.RequestLine;
o_stream = null; _lineState = LineState.None;
prefix = null; _context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem);
chunked = false;
ms = _memoryStreamFactory.CreateNew();
position = 0;
input_state = InputState.RequestLine;
line_state = LineState.None;
context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem);
} }
public bool IsClosed public bool IsClosed
{ {
get { return (sock == null); } get { return (_socket == null); }
} }
public int Reuses public int Reuses
{ {
get { return reuses; } get { return _reuses; }
} }
public IpEndPointInfo LocalEndPoint public IpEndPointInfo LocalEndPoint
@ -146,14 +141,14 @@ namespace SocketHttpListener.Net
if (local_ep != null) if (local_ep != null)
return local_ep; return local_ep;
local_ep = (IpEndPointInfo)sock.LocalEndPoint; local_ep = (IpEndPointInfo)_socket.LocalEndPoint;
return local_ep; return local_ep;
} }
} }
public IpEndPointInfo RemoteEndPoint public IpEndPointInfo RemoteEndPoint
{ {
get { return (IpEndPointInfo)sock.RemoteEndPoint; } get { return (IpEndPointInfo)_socket.RemoteEndPoint; }
} }
public bool IsSecure public bool IsSecure
@ -163,187 +158,186 @@ namespace SocketHttpListener.Net
public ListenerPrefix Prefix public ListenerPrefix Prefix
{ {
get { return prefix; } get { return _prefix; }
set { prefix = value; } set { _prefix = value; }
} }
public async Task BeginReadRequest() public void BeginReadRequest()
{ {
if (buffer == null) if (_buffer == null)
buffer = new byte[BufferSize]; _buffer = new byte[BufferSize];
try try
{ {
//if (reuses == 1) if (_reuses == 1)
// s_timeout = 15000; _timeout = 15000;
var nRead = await stream.ReadAsync(buffer, 0, BufferSize).ConfigureAwait(false); //_timer.Change(_timeout, Timeout.Infinite);
_stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this);
OnReadInternal(nRead);
} }
catch (Exception ex) catch
{ {
OnReadInternalException(ms, ex); //_timer.Change(Timeout.Infinite, Timeout.Infinite);
CloseSocket();
Unbind();
} }
} }
public HttpRequestStream GetRequestStream(bool chunked, long contentlength) public HttpRequestStream GetRequestStream(bool chunked, long contentlength)
{ {
if (i_stream == null) if (_requestStream == null)
{ {
byte[] buffer; byte[] buffer = _memoryStream.GetBuffer();
_memoryStreamFactory.TryGetBuffer(ms, out buffer); int length = (int)_memoryStream.Length;
_memoryStream = null;
int length = (int)ms.Length;
ms = null;
if (chunked) if (chunked)
{ {
this.chunked = true; _chunked = true;
//context.Response.SendChunked = true; //_context.Response.SendChunked = true;
i_stream = new ChunkedInputStream(context, stream, buffer, position, length - position); _requestStream = new ChunkedInputStream(_context, _stream, buffer, _position, length - _position);
} }
else else
{ {
i_stream = new HttpRequestStream(stream, buffer, position, length - position, contentlength); _requestStream = new HttpRequestStream(_stream, buffer, _position, length - _position, contentlength);
} }
} }
return i_stream; return _requestStream;
} }
public Stream GetResponseStream(bool isExpect100Continue = false) public Stream GetResponseStream(bool isExpect100Continue = false)
{ {
// TODO: can we get this stream before reading the input? // TODO: can we get this _stream before reading the input?
if (o_stream == null) if (_responseStream == null)
{ {
//context.Response.DetermineIfChunked(); var supportsDirectSocketAccess = !_context.Response.SendChunked && !isExpect100Continue && !secure;
var supportsDirectSocketAccess = !context.Response.SendChunked && !isExpect100Continue && !secure; _responseStream = new HttpResponseStream(_stream, _context.Response, false, _memoryStreamFactory, _socket, supportsDirectSocketAccess, _environment, _fileSystem, _logger);
//o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding, _fileSystem, sock, supportsDirectSocketAccess, _logger, _environment);
o_stream = new HttpResponseStream(stream, context.Response, false, _memoryStreamFactory, sock, supportsDirectSocketAccess, _environment, _fileSystem, _logger);
} }
return o_stream; return _responseStream;
} }
void OnReadInternal(int nread) private static void OnRead(IAsyncResult ares)
{ {
ms.Write(buffer, 0, nread); HttpConnection cnc = (HttpConnection)ares.AsyncState;
if (ms.Length > 32768) cnc.OnReadInternal(ares);
}
private void OnReadInternal(IAsyncResult ares)
{
//_timer.Change(Timeout.Infinite, Timeout.Infinite);
int nread = -1;
try
{ {
SendError("Bad request", 400); nread = _stream.EndRead(ares);
Close(true); _memoryStream.Write(_buffer, 0, nread);
if (_memoryStream.Length > 32768)
{
SendError("Bad Request", 400);
Close(true);
return;
}
}
catch
{
if (_memoryStream != null && _memoryStream.Length > 0)
SendError();
if (_socket != null)
{
CloseSocket();
Unbind();
}
return; return;
} }
if (nread == 0) if (nread == 0)
{ {
//if (ms.Length > 0)
// SendError (); // Why bother?
CloseSocket(); CloseSocket();
Unbind(); Unbind();
return; return;
} }
if (ProcessInput(ms)) if (ProcessInput(_memoryStream))
{ {
if (!context.HaveError) if (!_context.HaveError)
context.Request.FinishInitialization(); _context.Request.FinishInitialization();
if (context.HaveError) if (_context.HaveError)
{ {
SendError(); SendError();
Close(true); Close(true);
return; return;
} }
if (!epl.BindContext(context)) if (!_epl.BindContext(_context))
{ {
SendError("Invalid host", 400); SendError("Invalid host", 400);
Close(true); Close(true);
return; return;
} }
HttpListener listener = epl.Listener; HttpListener listener = _epl.Listener;
if (last_listener != listener) if (_lastListener != listener)
{ {
RemoveConnection(); RemoveConnection();
listener.AddConnection(this); listener.AddConnection(this);
last_listener = listener; _lastListener = listener;
} }
context_bound = true; _contextBound = true;
listener.RegisterContext(context); listener.RegisterContext(_context);
return; return;
} }
_stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this);
BeginReadRequest();
} }
private void OnReadInternalException(MemoryStream ms, Exception ex) private void RemoveConnection()
{ {
//_logger.ErrorException("Error in HttpConnection.OnReadInternal", ex); if (_lastListener == null)
_epl.RemoveConnection(this);
if (ms != null && ms.Length > 0)
SendError();
if (sock != null)
{
CloseSocket();
Unbind();
}
}
void RemoveConnection()
{
if (last_listener == null)
epl.RemoveConnection(this);
else else
last_listener.RemoveConnection(this); _lastListener.RemoveConnection(this);
} }
enum InputState private enum InputState
{ {
RequestLine, RequestLine,
Headers Headers
} }
enum LineState private enum LineState
{ {
None, None,
CR, CR,
LF LF
} }
InputState input_state = InputState.RequestLine; InputState _inputState = InputState.RequestLine;
LineState line_state = LineState.None; LineState _lineState = LineState.None;
int position; int _position;
// true -> done processing // true -> done processing
// false -> need more input // false -> need more input
bool ProcessInput(MemoryStream ms) private bool ProcessInput(MemoryStream ms)
{ {
byte[] buffer; byte[] buffer = ms.GetBuffer();
_memoryStreamFactory.TryGetBuffer(ms, out buffer);
int len = (int)ms.Length; int len = (int)ms.Length;
int used = 0; int used = 0;
string line; string line;
while (true) while (true)
{ {
if (context.HaveError) if (_context.HaveError)
return true; return true;
if (position >= len) if (_position >= len)
break; break;
try try
{ {
line = ReadLine(buffer, position, len - position, ref used); line = ReadLine(buffer, _position, len - _position, ref used);
position += used; _position += used;
} }
catch catch
{ {
context.ErrorMessage = "Bad request"; _context.ErrorMessage = "Bad request";
context.ErrorStatus = 400; _context.ErrorStatus = 400;
return true; return true;
} }
@ -352,28 +346,28 @@ namespace SocketHttpListener.Net
if (line == "") if (line == "")
{ {
if (input_state == InputState.RequestLine) if (_inputState == InputState.RequestLine)
continue; continue;
current_line = null; _currentLine = null;
ms = null; ms = null;
return true; return true;
} }
if (input_state == InputState.RequestLine) if (_inputState == InputState.RequestLine)
{ {
context.Request.SetRequestLine(line); _context.Request.SetRequestLine(line);
input_state = InputState.Headers; _inputState = InputState.Headers;
} }
else else
{ {
try try
{ {
context.Request.AddHeader(line); _context.Request.AddHeader(line);
} }
catch (Exception e) catch (Exception e)
{ {
context.ErrorMessage = e.Message; _context.ErrorMessage = e.Message;
context.ErrorStatus = 400; _context.ErrorStatus = 400;
return true; return true;
} }
} }
@ -382,42 +376,41 @@ namespace SocketHttpListener.Net
if (used == len) if (used == len)
{ {
ms.SetLength(0); ms.SetLength(0);
position = 0; _position = 0;
} }
return false; return false;
} }
string ReadLine(byte[] buffer, int offset, int len, ref int used) private string ReadLine(byte[] buffer, int offset, int len, ref int used)
{ {
if (current_line == null) if (_currentLine == null)
current_line = new StringBuilder(128); _currentLine = new StringBuilder(128);
int last = offset + len; int last = offset + len;
used = 0; used = 0;
for (int i = offset; i < last && _lineState != LineState.LF; i++)
for (int i = offset; i < last && line_state != LineState.LF; i++)
{ {
used++; used++;
byte b = buffer[i]; byte b = buffer[i];
if (b == 13) if (b == 13)
{ {
line_state = LineState.CR; _lineState = LineState.CR;
} }
else if (b == 10) else if (b == 10)
{ {
line_state = LineState.LF; _lineState = LineState.LF;
} }
else else
{ {
current_line.Append((char)b); _currentLine.Append((char)b);
} }
} }
string result = null; string result = null;
if (line_state == LineState.LF) if (_lineState == LineState.LF)
{ {
line_state = LineState.None; _lineState = LineState.None;
result = current_line.ToString(); result = _currentLine.ToString();
current_line.Length = 0; _currentLine.Length = 0;
} }
return result; return result;
@ -427,20 +420,18 @@ namespace SocketHttpListener.Net
{ {
try try
{ {
HttpListenerResponse response = context.Response; HttpListenerResponse response = _context.Response;
response.StatusCode = status; response.StatusCode = status;
response.ContentType = "text/html"; response.ContentType = "text/html";
string description = HttpListenerResponse.GetStatusDescription(status); string description = HttpListenerResponse.GetStatusDescription(status);
string str; string str;
if (msg != null) if (msg != null)
str = String.Format("<h1>{0} ({1})</h1>", description, msg); str = string.Format("<h1>{0} ({1})</h1>", description, msg);
else else
str = String.Format("<h1>{0}</h1>", description); str = string.Format("<h1>{0}</h1>", description);
byte[] error = context.Response.ContentEncoding.GetBytes(str); byte[] error = Encoding.Default.GetBytes(str);
response.ContentLength64 = error.Length; response.Close(error, false);
response.OutputStream.Write(error, 0, (int)error.Length);
response.Close();
} }
catch catch
{ {
@ -450,15 +441,15 @@ namespace SocketHttpListener.Net
public void SendError() public void SendError()
{ {
SendError(context.ErrorMessage, context.ErrorStatus); SendError(_context.ErrorMessage, _context.ErrorStatus);
} }
void Unbind() private void Unbind()
{ {
if (context_bound) if (_contextBound)
{ {
epl.UnbindContext(context); _epl.UnbindContext(_context);
context_bound = false; _contextBound = false;
} }
} }
@ -469,64 +460,60 @@ namespace SocketHttpListener.Net
private void CloseSocket() private void CloseSocket()
{ {
if (sock == null) if (_socket == null)
return; return;
try try
{ {
sock.Close(); _socket.Close();
}
catch
{
} }
catch { }
finally finally
{ {
sock = null; _socket = null;
} }
RemoveConnection(); RemoveConnection();
} }
internal void Close(bool force_close) internal void Close(bool force)
{ {
if (sock != null) if (_socket != null)
{ {
if (!context.Request.IsWebSocketRequest || force_close) Stream st = GetResponseStream();
{ if (st != null)
Stream st = GetResponseStream(); st.Close();
if (st != null)
{
st.Dispose();
}
o_stream = null; _responseStream = null;
}
} }
if (sock != null) if (_socket != null)
{ {
force_close |= !context.Request.KeepAlive; force |= !_context.Request.KeepAlive;
if (!force_close) if (!force)
force_close = (string.Equals(context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase)); force = (string.Equals(_context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase));
/*
if (!force_close) {
// bool conn_close = (status_code == 400 || status_code == 408 || status_code == 411 ||
// status_code == 413 || status_code == 414 || status_code == 500 ||
// status_code == 503);
force_close |= (context.Request.ProtocolVersion <= HttpVersion.Version10);
}
*/
if (!force_close && context.Request.FlushInput()) if (!force && _context.Request.FlushInput())
{ {
reuses++; if (_chunked && _context.Response.ForceCloseChunked == false)
{
// Don't close. Keep working.
_reuses++;
Unbind();
Init();
BeginReadRequest();
return;
}
_reuses++;
Unbind(); Unbind();
Init(); Init();
BeginReadRequest(); BeginReadRequest();
return; return;
} }
IAcceptSocket s = sock; IAcceptSocket s = _socket;
sock = null; _socket = null;
try try
{ {
if (s != null) if (s != null)

View File

@ -53,6 +53,11 @@ namespace SocketHttpListener.Net
} }
} }
public bool ForceCloseChunked
{
get { return false; }
}
public Encoding ContentEncoding public Encoding ContentEncoding
{ {
get get
@ -335,6 +340,48 @@ namespace SocketHttpListener.Net
context.Connection.Close(force); context.Connection.Close(force);
} }
public void Close(byte[] responseEntity, bool willBlock)
{
//CheckDisposed();
if (responseEntity == null)
{
throw new ArgumentNullException(nameof(responseEntity));
}
//if (_boundaryType != BoundaryType.Chunked)
{
ContentLength64 = responseEntity.Length;
}
if (willBlock)
{
try
{
OutputStream.Write(responseEntity, 0, responseEntity.Length);
}
finally
{
Close(false);
}
}
else
{
OutputStream.BeginWrite(responseEntity, 0, responseEntity.Length, iar =>
{
var thisRef = (HttpListenerResponse)iar.AsyncState;
try
{
thisRef.OutputStream.EndWrite(iar);
}
finally
{
thisRef.Close(false);
}
}, this);
}
}
public void Close() public void Close()
{ {
if (disposed) if (disposed)

View File

@ -325,10 +325,7 @@ namespace SocketHttpListener.Net
} }
} }
private bool EnableSendFileWithSocket private bool EnableSendFileWithSocket = false;
{
get { return false; }
}
public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken)
{ {