Improvements around streams

* Use ArrayPool instead of allocating new buffers each time
* Remove NetworkStream copy
* Remove some dead code
This commit is contained in:
Bond-009 2019-03-13 17:51:33 +01:00
parent 1d443d2ff5
commit e64aaebbac
11 changed files with 269 additions and 451 deletions

View File

@ -5,7 +5,6 @@ using System.IO;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.Services;
using Microsoft.Extensions.Logging;
using Microsoft.Net.Http.Headers;
namespace Emby.Server.Implementations.HttpServer

View File

@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@ -8,168 +9,213 @@ namespace Emby.Server.Implementations.IO
{
public class StreamHelper : IStreamHelper
{
private const int StreamCopyToBufferSize = 81920;
public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, Action onStarted, CancellationToken cancellationToken)
{
byte[] buffer = new byte[bufferSize];
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
cancellationToken.ThrowIfCancellationRequested();
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false);
if (onStarted != null)
int read;
while ((read = await source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) != 0)
{
onStarted();
onStarted = null;
cancellationToken.ThrowIfCancellationRequested();
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false);
if (onStarted != null)
{
onStarted();
onStarted = null;
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, int emptyReadLimit, CancellationToken cancellationToken)
{
byte[] buffer = new byte[bufferSize];
if (emptyReadLimit <= 0)
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
if (emptyReadLimit <= 0)
{
int read;
while ((read = await source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false);
}
return;
}
var eofCount = 0;
while (eofCount < emptyReadLimit)
{
cancellationToken.ThrowIfCancellationRequested();
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false);
}
var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
return;
if (bytesRead == 0)
{
eofCount++;
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
await destination.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
}
}
}
var eofCount = 0;
while (eofCount < emptyReadLimit)
finally
{
cancellationToken.ThrowIfCancellationRequested();
var bytesRead = source.Read(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
eofCount++;
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
await destination.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
}
ArrayPool<byte>.Shared.Return(buffer);
}
}
const int StreamCopyToBufferSize = 81920;
public async Task<int> CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken)
{
var array = new byte[StreamCopyToBufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
try
{
var bytesToWrite = bytesRead;
int totalBytesRead = 0;
if (bytesToWrite > 0)
int bytesRead;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
var bytesToWrite = bytesRead;
totalBytesRead += bytesRead;
if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
}
}
}
return totalBytesRead;
return totalBytesRead;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public async Task<int> CopyToAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken)
{
var array = new byte[StreamCopyToBufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
try
{
var bytesToWrite = bytesRead;
int bytesRead;
int totalBytesRead = 0;
if (bytesToWrite > 0)
while ((bytesRead = source.Read(buffer, 0, buffer.Length)) != 0)
{
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
var bytesToWrite = bytesRead;
totalBytesRead += bytesRead;
if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
}
}
}
return totalBytesRead;
return totalBytesRead;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public async Task CopyToAsyncWithSyncRead(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken)
{
var array = new byte[StreamCopyToBufferSize];
int bytesRead;
while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
try
{
var bytesToWrite = Math.Min(bytesRead, copyLength);
int bytesRead;
if (bytesToWrite > 0)
while ((bytesRead = source.Read(buffer, 0, buffer.Length)) != 0)
{
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
}
var bytesToWrite = Math.Min(bytesRead, copyLength);
copyLength -= bytesToWrite;
if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
}
if (copyLength <= 0)
{
break;
copyLength -= bytesToWrite;
if (copyLength <= 0)
{
break;
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public async Task CopyToAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken)
{
var array = new byte[StreamCopyToBufferSize];
int bytesRead;
while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
try
{
var bytesToWrite = Math.Min(bytesRead, copyLength);
int bytesRead;
if (bytesToWrite > 0)
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
}
var bytesToWrite = Math.Min(bytesRead, copyLength);
copyLength -= bytesToWrite;
if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
}
if (copyLength <= 0)
{
break;
copyLength -= bytesToWrite;
if (copyLength <= 0)
{
break;
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public async Task CopyUntilCancelled(Stream source, Stream target, int bufferSize, CancellationToken cancellationToken)
{
byte[] buffer = new byte[bufferSize];
while (!cancellationToken.IsCancellationRequested)
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.LogDebug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(100).ConfigureAwait(false);
var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken)

View File

@ -151,7 +151,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
});
}
private static int RtpHeaderBytes = 12;
private const int RtpHeaderBytes = 12;
private async Task CopyTo(MediaBrowser.Model.Net.ISocket udpClient, string file, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{
var bufferSize = 81920;

View File

@ -22,7 +22,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public string OriginalStreamId { get; set; }
public bool EnableStreamSharing { get; set; }
public string UniqueId { get; private set; }
public string UniqueId { get; }
protected readonly IFileSystem FileSystem;
protected readonly IServerApplicationPaths AppPaths;
@ -31,12 +31,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
protected readonly ILogger Logger;
protected readonly CancellationTokenSource LiveStreamCancellationTokenSource = new CancellationTokenSource();
public string TunerHostId { get; private set; }
public string TunerHostId { get; }
public DateTime DateOpened { get; protected set; }
public Func<Task> OnClose { get; set; }
public LiveStream(MediaSourceInfo mediaSource, TunerHostInfo tuner, IFileSystem fileSystem, ILogger logger, IServerApplicationPaths appPaths)
{
OriginalMediaSource = mediaSource;
@ -76,26 +74,9 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
LiveStreamCancellationTokenSource.Cancel();
if (OnClose != null)
{
return CloseWithExternalFn();
}
return Task.CompletedTask;
}
private async Task CloseWithExternalFn()
{
try
{
await OnClose().ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogError(ex, "Error closing live stream");
}
}
protected Stream GetInputStream(string path, bool allowAsyncFileRead)
{
var fileOpenOptions = FileOpenOptions.SequentialScan;
@ -113,27 +94,26 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
return DeleteTempFiles(GetStreamFilePaths());
}
protected async Task DeleteTempFiles(List<string> paths, int retryCount = 0)
protected async Task DeleteTempFiles(IEnumerable<string> paths, int retryCount = 0)
{
if (retryCount == 0)
{
Logger.LogInformation("Deleting temp files {0}", string.Join(", ", paths.ToArray()));
Logger.LogInformation("Deleting temp files {0}", paths);
}
var failedFiles = new List<string>();
foreach (var path in paths)
{
if (!File.Exists(path))
{
continue;
}
try
{
FileSystem.DeleteFile(path);
}
catch (DirectoryNotFoundException)
{
}
catch (FileNotFoundException)
{
}
catch (Exception ex)
{
Logger.LogError(ex, "Error deleting file {path}", path);
@ -157,8 +137,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, LiveStreamCancellationTokenSource.Token).Token;
var allowAsync = false;
// use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
// use non-async filestream on windows along with read due to https://github.com/dotnet/corefx/issues/6039
var allowAsync = Environment.OSVersion.Platform != PlatformID.Win32NT;
bool seekFile = (DateTime.UtcNow - DateOpened).TotalSeconds > 10;
@ -181,28 +161,24 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
Logger.LogInformation("Live Stream ended.");
}
private Tuple<string, bool> GetNextFile(string currentFile)
private (string file, bool isLastFile) GetNextFile(string currentFile)
{
var files = GetStreamFilePaths();
//logger.LogInformation("Live stream files: {0}", string.Join(", ", files.ToArray()));
if (string.IsNullOrEmpty(currentFile))
{
return new Tuple<string, bool>(files.Last(), true);
return (files.Last(), true);
}
var nextIndex = files.FindIndex(i => string.Equals(i, currentFile, StringComparison.OrdinalIgnoreCase)) + 1;
var isLastFile = nextIndex == files.Count - 1;
return new Tuple<string, bool>(files.ElementAtOrDefault(nextIndex), isLastFile);
return (files.ElementAtOrDefault(nextIndex), isLastFile);
}
private async Task CopyFile(string path, bool seekFile, int emptyReadLimit, bool allowAsync, Stream stream, CancellationToken cancellationToken)
{
//logger.LogInformation("Opening live stream file {0}. Empty read limit: {1}", path, emptyReadLimit);
using (var inputStream = (FileStream)GetInputStream(path, allowAsync))
{
if (seekFile)
@ -218,7 +194,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
private void TrySeek(FileStream stream, long offset)
{
//logger.LogInformation("TrySeek live stream");
if (!stream.CanSeek)
{
return;
}
try
{
stream.Seek(offset, SeekOrigin.End);

View File

@ -1,66 +0,0 @@
using System;
namespace Emby.Server.Implementations.Net
{
/// <summary>
/// Correclty implements the <see cref="IDisposable"/> interface and pattern for an object containing only managed resources, and adds a few common niceities not on the interface such as an <see cref="IsDisposed"/> property.
/// </summary>
public abstract class DisposableManagedObjectBase : IDisposable
{
#region Public Methods
/// <summary>
/// Override this method and dispose any objects you own the lifetime of if disposing is true;
/// </summary>
/// <param name="disposing">True if managed objects should be disposed, if false, only unmanaged resources should be released.</param>
protected abstract void Dispose(bool disposing);
//TODO Remove and reimplement using the IsDisposed property directly.
/// <summary>
/// Throws an <see cref="ObjectDisposedException"/> if the <see cref="IsDisposed"/> property is true.
/// </summary>
/// <seealso cref="IsDisposed"/>
/// <exception cref="ObjectDisposedException">Thrown if the <see cref="IsDisposed"/> property is true.</exception>
/// <seealso cref="Dispose()"/>
protected virtual void ThrowIfDisposed()
{
if (IsDisposed) throw new ObjectDisposedException(GetType().Name);
}
#endregion
#region Public Properties
/// <summary>
/// Sets or returns a boolean indicating whether or not this instance has been disposed.
/// </summary>
/// <seealso cref="Dispose()"/>
public bool IsDisposed
{
get;
private set;
}
#endregion
#region IDisposable Members
/// <summary>
/// Disposes this object instance and all internally managed resources.
/// </summary>
/// <remarks>
/// <para>Sets the <see cref="IsDisposed"/> property to true. Does not explicitly throw an exception if called multiple times, but makes no promises about behaviour of derived classes.</para>
/// </remarks>
/// <seealso cref="IsDisposed"/>
public void Dispose()
{
IsDisposed = true;
Dispose(true);
}
#endregion
}
}

View File

@ -4,7 +4,6 @@ using System.Net;
using System.Net.Sockets;
using Emby.Server.Implementations.Networking;
using MediaBrowser.Model.Net;
using Microsoft.Extensions.Logging;
namespace Emby.Server.Implementations.Net
{
@ -19,7 +18,10 @@ namespace Emby.Server.Implementations.Net
public ISocket CreateTcpSocket(IpAddressInfo remoteAddress, int remotePort)
{
if (remotePort < 0) throw new ArgumentException("remotePort cannot be less than zero.", nameof(remotePort));
if (remotePort < 0)
{
throw new ArgumentException("remotePort cannot be less than zero.", nameof(remotePort));
}
var addressFamily = remoteAddress.AddressFamily == IpAddressFamily.InterNetwork
? AddressFamily.InterNetwork
@ -42,8 +44,7 @@ namespace Emby.Server.Implementations.Net
}
catch
{
if (retVal != null)
retVal.Dispose();
retVal?.Dispose();
throw;
}
@ -55,7 +56,10 @@ namespace Emby.Server.Implementations.Net
/// <param name="localPort">An integer specifying the local port to bind the acceptSocket to.</param>
public ISocket CreateUdpSocket(int localPort)
{
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
try
@ -65,8 +69,7 @@ namespace Emby.Server.Implementations.Net
}
catch
{
if (retVal != null)
retVal.Dispose();
retVal?.Dispose();
throw;
}
@ -74,7 +77,10 @@ namespace Emby.Server.Implementations.Net
public ISocket CreateUdpBroadcastSocket(int localPort)
{
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
try
@ -86,8 +92,7 @@ namespace Emby.Server.Implementations.Net
}
catch
{
if (retVal != null)
retVal.Dispose();
retVal?.Dispose();
throw;
}
@ -99,7 +104,10 @@ namespace Emby.Server.Implementations.Net
/// <returns>An implementation of the <see cref="ISocket"/> interface used by RSSDP components to perform acceptSocket operations.</returns>
public ISocket CreateSsdpUdpSocket(IpAddressInfo localIpAddress, int localPort)
{
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
try
@ -114,8 +122,7 @@ namespace Emby.Server.Implementations.Net
}
catch
{
if (retVal != null)
retVal.Dispose();
retVal?.Dispose();
throw;
}
@ -130,10 +137,25 @@ namespace Emby.Server.Implementations.Net
/// <returns></returns>
public ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort)
{
if (ipAddress == null) throw new ArgumentNullException(nameof(ipAddress));
if (ipAddress.Length == 0) throw new ArgumentException("ipAddress cannot be an empty string.", nameof(ipAddress));
if (multicastTimeToLive <= 0) throw new ArgumentException("multicastTimeToLive cannot be zero or less.", nameof(multicastTimeToLive));
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
if (ipAddress == null)
{
throw new ArgumentNullException(nameof(ipAddress));
}
if (ipAddress.Length == 0)
{
throw new ArgumentException("ipAddress cannot be an empty string.", nameof(ipAddress));
}
if (multicastTimeToLive <= 0)
{
throw new ArgumentException("multicastTimeToLive cannot be zero or less.", nameof(multicastTimeToLive));
}
if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
@ -172,87 +194,13 @@ namespace Emby.Server.Implementations.Net
}
catch
{
if (retVal != null)
retVal.Dispose();
retVal?.Dispose();
throw;
}
}
public Stream CreateNetworkStream(ISocket socket, bool ownsSocket)
{
var netSocket = (UdpSocket)socket;
return new SocketStream(netSocket.Socket, ownsSocket);
}
=> new NetworkStream(((UdpSocket)socket).Socket, ownsSocket);
}
public class SocketStream : Stream
{
private readonly Socket _socket;
public SocketStream(Socket socket, bool ownsSocket)
{
_socket = socket;
}
public override void Flush()
{
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotImplementedException();
public override long Position
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
_socket.Send(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
_socket.EndSend(asyncResult);
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
return _socket.Receive(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state);
}
public override int EndRead(IAsyncResult asyncResult)
{
return _socket.EndReceive(asyncResult);
}
}
}

View File

@ -11,12 +11,15 @@ namespace Emby.Server.Implementations.Net
// THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS
// Be careful to check any changes compile and work for all platform projects it is shared in.
public sealed class UdpSocket : DisposableManagedObjectBase, ISocket
public sealed class UdpSocket : ISocket, IDisposable
{
private Socket _Socket;
private int _LocalPort;
private Socket _socket;
private int _localPort;
private bool _disposed = false;
public Socket Socket => _Socket;
public Socket Socket => _socket;
public IpAddressInfo LocalIPAddress { get; }
private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
{
@ -35,11 +38,11 @@ namespace Emby.Server.Implementations.Net
{
if (socket == null) throw new ArgumentNullException(nameof(socket));
_Socket = socket;
_LocalPort = localPort;
_socket = socket;
_localPort = localPort;
LocalIPAddress = NetworkManager.ToIpAddressInfo(ip);
_Socket.Bind(new IPEndPoint(ip, _LocalPort));
_socket.Bind(new IPEndPoint(ip, _localPort));
InitReceiveSocketAsyncEventArgs();
}
@ -101,32 +104,26 @@ namespace Emby.Server.Implementations.Net
{
if (socket == null) throw new ArgumentNullException(nameof(socket));
_Socket = socket;
_Socket.Connect(NetworkManager.ToIPEndPoint(endPoint));
_socket = socket;
_socket.Connect(NetworkManager.ToIPEndPoint(endPoint));
InitReceiveSocketAsyncEventArgs();
}
public IpAddressInfo LocalIPAddress
{
get;
private set;
}
public IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback)
{
ThrowIfDisposed();
EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0);
return _Socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer);
return _socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer);
}
public int Receive(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();
return _Socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
return _socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
}
public SocketReceiveResult EndReceive(IAsyncResult result)
@ -136,7 +133,7 @@ namespace Emby.Server.Implementations.Net
var sender = new IPEndPoint(IPAddress.Any, 0);
var remoteEndPoint = (EndPoint)sender;
var receivedBytes = _Socket.EndReceiveFrom(result, ref remoteEndPoint);
var receivedBytes = _socket.EndReceiveFrom(result, ref remoteEndPoint);
var buffer = (byte[])result.AsyncState;
@ -236,37 +233,42 @@ namespace Emby.Server.Implementations.Net
var ipEndPoint = NetworkManager.ToIPEndPoint(endPoint);
return _Socket.BeginSendTo(buffer, offset, size, SocketFlags.None, ipEndPoint, callback, state);
return _socket.BeginSendTo(buffer, offset, size, SocketFlags.None, ipEndPoint, callback, state);
}
public int EndSendTo(IAsyncResult result)
{
ThrowIfDisposed();
return _Socket.EndSendTo(result);
return _socket.EndSendTo(result);
}
protected override void Dispose(bool disposing)
private void ThrowIfDisposed()
{
if (disposing)
if (_disposed)
{
var socket = _Socket;
if (socket != null)
socket.Dispose();
var tcs = _currentReceiveTaskCompletionSource;
if (tcs != null)
{
tcs.TrySetCanceled();
}
var sendTcs = _currentSendTaskCompletionSource;
if (sendTcs != null)
{
sendTcs.TrySetCanceled();
}
throw new ObjectDisposedException(nameof(UdpSocket));
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_socket?.Dispose();
_currentReceiveTaskCompletionSource?.TrySetCanceled();
_currentSendTaskCompletionSource?.TrySetCanceled();
_socket = null;
_currentReceiveTaskCompletionSource = null;
_currentSendTaskCompletionSource = null;
_disposed = true;
}
private static IpEndPointInfo ToIpEndPointInfo(IPEndPoint endpoint)
{
if (endpoint == null)

View File

@ -23,7 +23,6 @@ using MediaBrowser.Model.IO;
using MediaBrowser.Model.LiveTv;
using MediaBrowser.Model.Querying;
using MediaBrowser.Model.Services;
using MediaBrowser.Model.System;
using Microsoft.Net.Http.Headers;
namespace MediaBrowser.Api.LiveTv
@ -695,27 +694,36 @@ namespace MediaBrowser.Api.LiveTv
private readonly IHttpClient _httpClient;
private readonly ILibraryManager _libraryManager;
private readonly IDtoService _dtoService;
private readonly IFileSystem _fileSystem;
private readonly IAuthorizationContext _authContext;
private readonly ISessionContext _sessionContext;
private ICryptoProvider _cryptographyProvider;
private IStreamHelper _streamHelper;
private IMediaSourceManager _mediaSourceManager;
private readonly ICryptoProvider _cryptographyProvider;
private readonly IStreamHelper _streamHelper;
private readonly IMediaSourceManager _mediaSourceManager;
public LiveTvService(ICryptoProvider crypto, IMediaSourceManager mediaSourceManager, IStreamHelper streamHelper, ILiveTvManager liveTvManager, IUserManager userManager, IServerConfigurationManager config, IHttpClient httpClient, ILibraryManager libraryManager, IDtoService dtoService, IFileSystem fileSystem, IAuthorizationContext authContext, ISessionContext sessionContext)
public LiveTvService(
ICryptoProvider crypto,
IMediaSourceManager mediaSourceManager,
IStreamHelper streamHelper,
ILiveTvManager liveTvManager,
IUserManager userManager,
IServerConfigurationManager config,
IHttpClient httpClient,
ILibraryManager libraryManager,
IDtoService dtoService,
IAuthorizationContext authContext,
ISessionContext sessionContext)
{
_cryptographyProvider = crypto;
_mediaSourceManager = mediaSourceManager;
_streamHelper = streamHelper;
_liveTvManager = liveTvManager;
_userManager = userManager;
_config = config;
_httpClient = httpClient;
_libraryManager = libraryManager;
_dtoService = dtoService;
_fileSystem = fileSystem;
_authContext = authContext;
_sessionContext = sessionContext;
_cryptographyProvider = crypto;
_streamHelper = streamHelper;
_mediaSourceManager = mediaSourceManager;
}
public object Get(GetTunerHostTypes request)
@ -729,7 +737,7 @@ namespace MediaBrowser.Api.LiveTv
var user = request.UserId.Equals(Guid.Empty) ? null : _userManager.GetUserById(request.UserId);
var folders = _liveTvManager.GetRecordingFolders(user);
var returnArray = _dtoService.GetBaseItemDtos(folders.ToArray(), new DtoOptions(), user);
var returnArray = _dtoService.GetBaseItemDtos(folders, new DtoOptions(), user);
var result = new QueryResult<BaseItemDto>
{
@ -754,7 +762,7 @@ namespace MediaBrowser.Api.LiveTv
[HeaderNames.ContentType] = Model.Net.MimeTypes.GetMimeType(path)
};
return new ProgressiveFileCopier(_fileSystem, _streamHelper, path, outputHeaders, Logger)
return new ProgressiveFileCopier(_streamHelper, path, outputHeaders, Logger)
{
AllowEndOfFile = false
};

View File

@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
@ -11,22 +12,17 @@ namespace MediaBrowser.Api.LiveTv
{
public class ProgressiveFileCopier : IAsyncStreamWriter, IHasHeaders
{
private readonly IFileSystem _fileSystem;
private readonly ILogger _logger;
private readonly string _path;
private readonly Dictionary<string, string> _outputHeaders;
const int StreamCopyToBufferSize = 81920;
public long StartPosition { get; set; }
public bool AllowEndOfFile = true;
private readonly IDirectStreamProvider _directStreamProvider;
private IStreamHelper _streamHelper;
public ProgressiveFileCopier(IFileSystem fileSystem, IStreamHelper streamHelper, string path, Dictionary<string, string> outputHeaders, ILogger logger)
public ProgressiveFileCopier(IStreamHelper streamHelper, string path, Dictionary<string, string> outputHeaders, ILogger logger)
{
_fileSystem = fileSystem;
_path = path;
_outputHeaders = outputHeaders;
_logger = logger;
@ -43,18 +39,6 @@ namespace MediaBrowser.Api.LiveTv
public IDictionary<string, string> Headers => _outputHeaders;
private Stream GetInputStream(bool allowAsyncFileRead)
{
var fileOpenOptions = FileOpenOptions.SequentialScan;
if (allowAsyncFileRead)
{
fileOpenOptions |= FileOpenOptions.Asynchronous;
}
return _fileSystem.GetFileStream(_path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, fileOpenOptions);
}
public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken)
{
if (_directStreamProvider != null)
@ -63,28 +47,23 @@ namespace MediaBrowser.Api.LiveTv
return;
}
var eofCount = 0;
var fileOptions = FileOptions.SequentialScan;
// use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
var allowAsyncFileRead = true;
using (var inputStream = GetInputStream(allowAsyncFileRead))
if (Environment.OSVersion.Platform != PlatformID.Win32NT)
{
if (StartPosition > 0)
{
inputStream.Position = StartPosition;
}
fileOptions |= FileOptions.Asynchronous;
}
using (var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, fileOptions))
{
var emptyReadLimit = AllowEndOfFile ? 20 : 100;
var eofCount = 0;
while (eofCount < emptyReadLimit)
{
int bytesRead;
bytesRead = await _streamHelper.CopyToAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.LogDebug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
eofCount++;

View File

@ -1,64 +0,0 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
namespace MediaBrowser.Model.Net
{
public class HttpResponse : IDisposable
{
/// <summary>
/// Gets or sets the type of the content.
/// </summary>
/// <value>The type of the content.</value>
public string ContentType { get; set; }
/// <summary>
/// Gets or sets the response URL.
/// </summary>
/// <value>The response URL.</value>
public string ResponseUrl { get; set; }
/// <summary>
/// Gets or sets the content.
/// </summary>
/// <value>The content.</value>
public Stream Content { get; set; }
/// <summary>
/// Gets or sets the status code.
/// </summary>
/// <value>The status code.</value>
public HttpStatusCode StatusCode { get; set; }
/// <summary>
/// Gets or sets the length of the content.
/// </summary>
/// <value>The length of the content.</value>
public long? ContentLength { get; set; }
/// <summary>
/// Gets or sets the headers.
/// </summary>
/// <value>The headers.</value>
public Dictionary<string, string> Headers { get; set; }
private readonly IDisposable _disposable;
public HttpResponse(IDisposable disposable)
{
_disposable = disposable;
}
public HttpResponse()
{
}
public void Dispose()
{
if (_disposable != null)
{
_disposable.Dispose();
}
}
}
}

View File

@ -1,15 +0,0 @@
using System;
namespace MediaBrowser.Model.Net
{
public class SocketCreateException : Exception
{
public SocketCreateException(string errorCode, Exception originalException)
: base(errorCode, originalException)
{
ErrorCode = errorCode;
}
public string ErrorCode { get; private set; }
}
}