Merge pull request #1103 from Bond-009/stream

Improvements around streams
This commit is contained in:
Vasily 2019-03-14 19:53:50 +03:00 committed by GitHub
commit bf00dedc7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 269 additions and 451 deletions

View File

@ -5,7 +5,6 @@ using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Model.Services; using MediaBrowser.Model.Services;
using Microsoft.Extensions.Logging;
using Microsoft.Net.Http.Headers; using Microsoft.Net.Http.Headers;
namespace Emby.Server.Implementations.HttpServer namespace Emby.Server.Implementations.HttpServer

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Buffers;
using System.IO; using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -8,168 +9,213 @@ namespace Emby.Server.Implementations.IO
{ {
public class StreamHelper : IStreamHelper public class StreamHelper : IStreamHelper
{ {
private const int StreamCopyToBufferSize = 81920;
public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, Action onStarted, CancellationToken cancellationToken) public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, Action onStarted, CancellationToken cancellationToken)
{ {
byte[] buffer = new byte[bufferSize]; byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
int read; try
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{ {
cancellationToken.ThrowIfCancellationRequested(); int read;
while ((read = await source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) != 0)
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false);
if (onStarted != null)
{ {
onStarted(); cancellationToken.ThrowIfCancellationRequested();
onStarted = null;
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false);
if (onStarted != null)
{
onStarted();
onStarted = null;
}
} }
} }
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
} }
public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, int emptyReadLimit, CancellationToken cancellationToken) public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, int emptyReadLimit, CancellationToken cancellationToken)
{ {
byte[] buffer = new byte[bufferSize]; byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
if (emptyReadLimit <= 0)
{ {
int read; if (emptyReadLimit <= 0)
while ((read = source.Read(buffer, 0, buffer.Length)) != 0) {
int read;
while ((read = await source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false);
}
return;
}
var eofCount = 0;
while (eofCount < emptyReadLimit)
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false); var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
}
return; if (bytesRead == 0)
{
eofCount++;
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
await destination.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
}
}
} }
finally
var eofCount = 0;
while (eofCount < emptyReadLimit)
{ {
cancellationToken.ThrowIfCancellationRequested(); ArrayPool<byte>.Shared.Return(buffer);
var bytesRead = source.Read(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
eofCount++;
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
await destination.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
}
} }
} }
const int StreamCopyToBufferSize = 81920;
public async Task<int> CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken) public async Task<int> CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken)
{ {
var array = new byte[StreamCopyToBufferSize]; byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
int bytesRead; try
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
{ {
var bytesToWrite = bytesRead; int totalBytesRead = 0;
if (bytesToWrite > 0) int bytesRead;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{ {
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); var bytesToWrite = bytesRead;
totalBytesRead += bytesRead; if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
}
} }
}
return totalBytesRead; return totalBytesRead;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
} }
public async Task<int> CopyToAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken) public async Task<int> CopyToAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken)
{ {
var array = new byte[StreamCopyToBufferSize]; byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
int bytesRead; try
int totalBytesRead = 0;
while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
{ {
var bytesToWrite = bytesRead; int bytesRead;
int totalBytesRead = 0;
if (bytesToWrite > 0) while ((bytesRead = source.Read(buffer, 0, buffer.Length)) != 0)
{ {
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); var bytesToWrite = bytesRead;
totalBytesRead += bytesRead; if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
totalBytesRead += bytesRead;
}
} }
}
return totalBytesRead; return totalBytesRead;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
} }
public async Task CopyToAsyncWithSyncRead(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) public async Task CopyToAsyncWithSyncRead(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken)
{ {
var array = new byte[StreamCopyToBufferSize]; byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
int bytesRead; try
while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
{ {
var bytesToWrite = Math.Min(bytesRead, copyLength); int bytesRead;
if (bytesToWrite > 0) while ((bytesRead = source.Read(buffer, 0, buffer.Length)) != 0)
{ {
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); var bytesToWrite = Math.Min(bytesRead, copyLength);
}
copyLength -= bytesToWrite; if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
}
if (copyLength <= 0) copyLength -= bytesToWrite;
{
break; if (copyLength <= 0)
{
break;
}
} }
} }
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
} }
public async Task CopyToAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) public async Task CopyToAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken)
{ {
var array = new byte[StreamCopyToBufferSize]; byte[] buffer = ArrayPool<byte>.Shared.Rent(StreamCopyToBufferSize);
int bytesRead; try
while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
{ {
var bytesToWrite = Math.Min(bytesRead, copyLength); int bytesRead;
if (bytesToWrite > 0) while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{ {
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); var bytesToWrite = Math.Min(bytesRead, copyLength);
}
copyLength -= bytesToWrite; if (bytesToWrite > 0)
{
await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
}
if (copyLength <= 0) copyLength -= bytesToWrite;
{
break; if (copyLength <= 0)
{
break;
}
} }
} }
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
} }
public async Task CopyUntilCancelled(Stream source, Stream target, int bufferSize, CancellationToken cancellationToken) public async Task CopyUntilCancelled(Stream source, Stream target, int bufferSize, CancellationToken cancellationToken)
{ {
byte[] buffer = new byte[bufferSize]; byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
while (!cancellationToken.IsCancellationRequested)
{ {
var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false); while (!cancellationToken.IsCancellationRequested)
//var position = fs.Position;
//_logger.LogDebug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{ {
await Task.Delay(100).ConfigureAwait(false); var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
await Task.Delay(100).ConfigureAwait(false);
}
} }
} }
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
} }
private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken) private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken)

View File

@ -151,7 +151,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}); });
} }
private static int RtpHeaderBytes = 12; private const int RtpHeaderBytes = 12;
private async Task CopyTo(MediaBrowser.Model.Net.ISocket udpClient, string file, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) private async Task CopyTo(MediaBrowser.Model.Net.ISocket udpClient, string file, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{ {
var bufferSize = 81920; var bufferSize = 81920;

View File

@ -22,7 +22,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public string OriginalStreamId { get; set; } public string OriginalStreamId { get; set; }
public bool EnableStreamSharing { get; set; } public bool EnableStreamSharing { get; set; }
public string UniqueId { get; private set; } public string UniqueId { get; }
protected readonly IFileSystem FileSystem; protected readonly IFileSystem FileSystem;
protected readonly IServerApplicationPaths AppPaths; protected readonly IServerApplicationPaths AppPaths;
@ -31,12 +31,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
protected readonly ILogger Logger; protected readonly ILogger Logger;
protected readonly CancellationTokenSource LiveStreamCancellationTokenSource = new CancellationTokenSource(); protected readonly CancellationTokenSource LiveStreamCancellationTokenSource = new CancellationTokenSource();
public string TunerHostId { get; private set; } public string TunerHostId { get; }
public DateTime DateOpened { get; protected set; } public DateTime DateOpened { get; protected set; }
public Func<Task> OnClose { get; set; }
public LiveStream(MediaSourceInfo mediaSource, TunerHostInfo tuner, IFileSystem fileSystem, ILogger logger, IServerApplicationPaths appPaths) public LiveStream(MediaSourceInfo mediaSource, TunerHostInfo tuner, IFileSystem fileSystem, ILogger logger, IServerApplicationPaths appPaths)
{ {
OriginalMediaSource = mediaSource; OriginalMediaSource = mediaSource;
@ -76,26 +74,9 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
LiveStreamCancellationTokenSource.Cancel(); LiveStreamCancellationTokenSource.Cancel();
if (OnClose != null)
{
return CloseWithExternalFn();
}
return Task.CompletedTask; return Task.CompletedTask;
} }
private async Task CloseWithExternalFn()
{
try
{
await OnClose().ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogError(ex, "Error closing live stream");
}
}
protected Stream GetInputStream(string path, bool allowAsyncFileRead) protected Stream GetInputStream(string path, bool allowAsyncFileRead)
{ {
var fileOpenOptions = FileOpenOptions.SequentialScan; var fileOpenOptions = FileOpenOptions.SequentialScan;
@ -113,27 +94,26 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
return DeleteTempFiles(GetStreamFilePaths()); return DeleteTempFiles(GetStreamFilePaths());
} }
protected async Task DeleteTempFiles(List<string> paths, int retryCount = 0) protected async Task DeleteTempFiles(IEnumerable<string> paths, int retryCount = 0)
{ {
if (retryCount == 0) if (retryCount == 0)
{ {
Logger.LogInformation("Deleting temp files {0}", string.Join(", ", paths.ToArray())); Logger.LogInformation("Deleting temp files {0}", paths);
} }
var failedFiles = new List<string>(); var failedFiles = new List<string>();
foreach (var path in paths) foreach (var path in paths)
{ {
if (!File.Exists(path))
{
continue;
}
try try
{ {
FileSystem.DeleteFile(path); FileSystem.DeleteFile(path);
} }
catch (DirectoryNotFoundException)
{
}
catch (FileNotFoundException)
{
}
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, "Error deleting file {path}", path); Logger.LogError(ex, "Error deleting file {path}", path);
@ -157,8 +137,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{ {
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, LiveStreamCancellationTokenSource.Token).Token; cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, LiveStreamCancellationTokenSource.Token).Token;
var allowAsync = false; // use non-async filestream on windows along with read due to https://github.com/dotnet/corefx/issues/6039
// use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 var allowAsync = Environment.OSVersion.Platform != PlatformID.Win32NT;
bool seekFile = (DateTime.UtcNow - DateOpened).TotalSeconds > 10; bool seekFile = (DateTime.UtcNow - DateOpened).TotalSeconds > 10;
@ -181,28 +161,24 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
Logger.LogInformation("Live Stream ended."); Logger.LogInformation("Live Stream ended.");
} }
private Tuple<string, bool> GetNextFile(string currentFile) private (string file, bool isLastFile) GetNextFile(string currentFile)
{ {
var files = GetStreamFilePaths(); var files = GetStreamFilePaths();
//logger.LogInformation("Live stream files: {0}", string.Join(", ", files.ToArray()));
if (string.IsNullOrEmpty(currentFile)) if (string.IsNullOrEmpty(currentFile))
{ {
return new Tuple<string, bool>(files.Last(), true); return (files.Last(), true);
} }
var nextIndex = files.FindIndex(i => string.Equals(i, currentFile, StringComparison.OrdinalIgnoreCase)) + 1; var nextIndex = files.FindIndex(i => string.Equals(i, currentFile, StringComparison.OrdinalIgnoreCase)) + 1;
var isLastFile = nextIndex == files.Count - 1; var isLastFile = nextIndex == files.Count - 1;
return new Tuple<string, bool>(files.ElementAtOrDefault(nextIndex), isLastFile); return (files.ElementAtOrDefault(nextIndex), isLastFile);
} }
private async Task CopyFile(string path, bool seekFile, int emptyReadLimit, bool allowAsync, Stream stream, CancellationToken cancellationToken) private async Task CopyFile(string path, bool seekFile, int emptyReadLimit, bool allowAsync, Stream stream, CancellationToken cancellationToken)
{ {
//logger.LogInformation("Opening live stream file {0}. Empty read limit: {1}", path, emptyReadLimit);
using (var inputStream = (FileStream)GetInputStream(path, allowAsync)) using (var inputStream = (FileStream)GetInputStream(path, allowAsync))
{ {
if (seekFile) if (seekFile)
@ -218,7 +194,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
private void TrySeek(FileStream stream, long offset) private void TrySeek(FileStream stream, long offset)
{ {
//logger.LogInformation("TrySeek live stream"); if (!stream.CanSeek)
{
return;
}
try try
{ {
stream.Seek(offset, SeekOrigin.End); stream.Seek(offset, SeekOrigin.End);

View File

@ -1,66 +0,0 @@
using System;
namespace Emby.Server.Implementations.Net
{
/// <summary>
/// Correclty implements the <see cref="IDisposable"/> interface and pattern for an object containing only managed resources, and adds a few common niceities not on the interface such as an <see cref="IsDisposed"/> property.
/// </summary>
public abstract class DisposableManagedObjectBase : IDisposable
{
#region Public Methods
/// <summary>
/// Override this method and dispose any objects you own the lifetime of if disposing is true;
/// </summary>
/// <param name="disposing">True if managed objects should be disposed, if false, only unmanaged resources should be released.</param>
protected abstract void Dispose(bool disposing);
//TODO Remove and reimplement using the IsDisposed property directly.
/// <summary>
/// Throws an <see cref="ObjectDisposedException"/> if the <see cref="IsDisposed"/> property is true.
/// </summary>
/// <seealso cref="IsDisposed"/>
/// <exception cref="ObjectDisposedException">Thrown if the <see cref="IsDisposed"/> property is true.</exception>
/// <seealso cref="Dispose()"/>
protected virtual void ThrowIfDisposed()
{
if (IsDisposed) throw new ObjectDisposedException(GetType().Name);
}
#endregion
#region Public Properties
/// <summary>
/// Sets or returns a boolean indicating whether or not this instance has been disposed.
/// </summary>
/// <seealso cref="Dispose()"/>
public bool IsDisposed
{
get;
private set;
}
#endregion
#region IDisposable Members
/// <summary>
/// Disposes this object instance and all internally managed resources.
/// </summary>
/// <remarks>
/// <para>Sets the <see cref="IsDisposed"/> property to true. Does not explicitly throw an exception if called multiple times, but makes no promises about behaviour of derived classes.</para>
/// </remarks>
/// <seealso cref="IsDisposed"/>
public void Dispose()
{
IsDisposed = true;
Dispose(true);
}
#endregion
}
}

View File

@ -4,7 +4,6 @@ using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using Emby.Server.Implementations.Networking; using Emby.Server.Implementations.Networking;
using MediaBrowser.Model.Net; using MediaBrowser.Model.Net;
using Microsoft.Extensions.Logging;
namespace Emby.Server.Implementations.Net namespace Emby.Server.Implementations.Net
{ {
@ -19,7 +18,10 @@ namespace Emby.Server.Implementations.Net
public ISocket CreateTcpSocket(IpAddressInfo remoteAddress, int remotePort) public ISocket CreateTcpSocket(IpAddressInfo remoteAddress, int remotePort)
{ {
if (remotePort < 0) throw new ArgumentException("remotePort cannot be less than zero.", nameof(remotePort)); if (remotePort < 0)
{
throw new ArgumentException("remotePort cannot be less than zero.", nameof(remotePort));
}
var addressFamily = remoteAddress.AddressFamily == IpAddressFamily.InterNetwork var addressFamily = remoteAddress.AddressFamily == IpAddressFamily.InterNetwork
? AddressFamily.InterNetwork ? AddressFamily.InterNetwork
@ -42,8 +44,7 @@ namespace Emby.Server.Implementations.Net
} }
catch catch
{ {
if (retVal != null) retVal?.Dispose();
retVal.Dispose();
throw; throw;
} }
@ -55,7 +56,10 @@ namespace Emby.Server.Implementations.Net
/// <param name="localPort">An integer specifying the local port to bind the acceptSocket to.</param> /// <param name="localPort">An integer specifying the local port to bind the acceptSocket to.</param>
public ISocket CreateUdpSocket(int localPort) public ISocket CreateUdpSocket(int localPort)
{ {
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort)); if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp); var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
try try
@ -65,8 +69,7 @@ namespace Emby.Server.Implementations.Net
} }
catch catch
{ {
if (retVal != null) retVal?.Dispose();
retVal.Dispose();
throw; throw;
} }
@ -74,7 +77,10 @@ namespace Emby.Server.Implementations.Net
public ISocket CreateUdpBroadcastSocket(int localPort) public ISocket CreateUdpBroadcastSocket(int localPort)
{ {
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort)); if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp); var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
try try
@ -86,8 +92,7 @@ namespace Emby.Server.Implementations.Net
} }
catch catch
{ {
if (retVal != null) retVal?.Dispose();
retVal.Dispose();
throw; throw;
} }
@ -99,7 +104,10 @@ namespace Emby.Server.Implementations.Net
/// <returns>An implementation of the <see cref="ISocket"/> interface used by RSSDP components to perform acceptSocket operations.</returns> /// <returns>An implementation of the <see cref="ISocket"/> interface used by RSSDP components to perform acceptSocket operations.</returns>
public ISocket CreateSsdpUdpSocket(IpAddressInfo localIpAddress, int localPort) public ISocket CreateSsdpUdpSocket(IpAddressInfo localIpAddress, int localPort)
{ {
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort)); if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp); var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
try try
@ -114,8 +122,7 @@ namespace Emby.Server.Implementations.Net
} }
catch catch
{ {
if (retVal != null) retVal?.Dispose();
retVal.Dispose();
throw; throw;
} }
@ -130,10 +137,25 @@ namespace Emby.Server.Implementations.Net
/// <returns></returns> /// <returns></returns>
public ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort) public ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort)
{ {
if (ipAddress == null) throw new ArgumentNullException(nameof(ipAddress)); if (ipAddress == null)
if (ipAddress.Length == 0) throw new ArgumentException("ipAddress cannot be an empty string.", nameof(ipAddress)); {
if (multicastTimeToLive <= 0) throw new ArgumentException("multicastTimeToLive cannot be zero or less.", nameof(multicastTimeToLive)); throw new ArgumentNullException(nameof(ipAddress));
if (localPort < 0) throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort)); }
if (ipAddress.Length == 0)
{
throw new ArgumentException("ipAddress cannot be an empty string.", nameof(ipAddress));
}
if (multicastTimeToLive <= 0)
{
throw new ArgumentException("multicastTimeToLive cannot be zero or less.", nameof(multicastTimeToLive));
}
if (localPort < 0)
{
throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort));
}
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp); var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Dgram, System.Net.Sockets.ProtocolType.Udp);
@ -172,87 +194,13 @@ namespace Emby.Server.Implementations.Net
} }
catch catch
{ {
if (retVal != null) retVal?.Dispose();
retVal.Dispose();
throw; throw;
} }
} }
public Stream CreateNetworkStream(ISocket socket, bool ownsSocket) public Stream CreateNetworkStream(ISocket socket, bool ownsSocket)
{ => new NetworkStream(((UdpSocket)socket).Socket, ownsSocket);
var netSocket = (UdpSocket)socket;
return new SocketStream(netSocket.Socket, ownsSocket);
}
} }
public class SocketStream : Stream
{
private readonly Socket _socket;
public SocketStream(Socket socket, bool ownsSocket)
{
_socket = socket;
}
public override void Flush()
{
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotImplementedException();
public override long Position
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
_socket.Send(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
_socket.EndSend(asyncResult);
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
return _socket.Receive(buffer, offset, count, SocketFlags.None);
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state);
}
public override int EndRead(IAsyncResult asyncResult)
{
return _socket.EndReceive(asyncResult);
}
}
} }

View File

@ -11,12 +11,15 @@ namespace Emby.Server.Implementations.Net
// THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS // THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS
// Be careful to check any changes compile and work for all platform projects it is shared in. // Be careful to check any changes compile and work for all platform projects it is shared in.
public sealed class UdpSocket : DisposableManagedObjectBase, ISocket public sealed class UdpSocket : ISocket, IDisposable
{ {
private Socket _Socket; private Socket _socket;
private int _LocalPort; private int _localPort;
private bool _disposed = false;
public Socket Socket => _Socket; public Socket Socket => _socket;
public IpAddressInfo LocalIPAddress { get; }
private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs() private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
{ {
@ -35,11 +38,11 @@ namespace Emby.Server.Implementations.Net
{ {
if (socket == null) throw new ArgumentNullException(nameof(socket)); if (socket == null) throw new ArgumentNullException(nameof(socket));
_Socket = socket; _socket = socket;
_LocalPort = localPort; _localPort = localPort;
LocalIPAddress = NetworkManager.ToIpAddressInfo(ip); LocalIPAddress = NetworkManager.ToIpAddressInfo(ip);
_Socket.Bind(new IPEndPoint(ip, _LocalPort)); _socket.Bind(new IPEndPoint(ip, _localPort));
InitReceiveSocketAsyncEventArgs(); InitReceiveSocketAsyncEventArgs();
} }
@ -101,32 +104,26 @@ namespace Emby.Server.Implementations.Net
{ {
if (socket == null) throw new ArgumentNullException(nameof(socket)); if (socket == null) throw new ArgumentNullException(nameof(socket));
_Socket = socket; _socket = socket;
_Socket.Connect(NetworkManager.ToIPEndPoint(endPoint)); _socket.Connect(NetworkManager.ToIPEndPoint(endPoint));
InitReceiveSocketAsyncEventArgs(); InitReceiveSocketAsyncEventArgs();
} }
public IpAddressInfo LocalIPAddress
{
get;
private set;
}
public IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback) public IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback)
{ {
ThrowIfDisposed(); ThrowIfDisposed();
EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0); EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0);
return _Socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer); return _socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer);
} }
public int Receive(byte[] buffer, int offset, int count) public int Receive(byte[] buffer, int offset, int count)
{ {
ThrowIfDisposed(); ThrowIfDisposed();
return _Socket.Receive(buffer, 0, buffer.Length, SocketFlags.None); return _socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
} }
public SocketReceiveResult EndReceive(IAsyncResult result) public SocketReceiveResult EndReceive(IAsyncResult result)
@ -136,7 +133,7 @@ namespace Emby.Server.Implementations.Net
var sender = new IPEndPoint(IPAddress.Any, 0); var sender = new IPEndPoint(IPAddress.Any, 0);
var remoteEndPoint = (EndPoint)sender; var remoteEndPoint = (EndPoint)sender;
var receivedBytes = _Socket.EndReceiveFrom(result, ref remoteEndPoint); var receivedBytes = _socket.EndReceiveFrom(result, ref remoteEndPoint);
var buffer = (byte[])result.AsyncState; var buffer = (byte[])result.AsyncState;
@ -236,37 +233,42 @@ namespace Emby.Server.Implementations.Net
var ipEndPoint = NetworkManager.ToIPEndPoint(endPoint); var ipEndPoint = NetworkManager.ToIPEndPoint(endPoint);
return _Socket.BeginSendTo(buffer, offset, size, SocketFlags.None, ipEndPoint, callback, state); return _socket.BeginSendTo(buffer, offset, size, SocketFlags.None, ipEndPoint, callback, state);
} }
public int EndSendTo(IAsyncResult result) public int EndSendTo(IAsyncResult result)
{ {
ThrowIfDisposed(); ThrowIfDisposed();
return _Socket.EndSendTo(result); return _socket.EndSendTo(result);
} }
protected override void Dispose(bool disposing) private void ThrowIfDisposed()
{ {
if (disposing) if (_disposed)
{ {
var socket = _Socket; throw new ObjectDisposedException(nameof(UdpSocket));
if (socket != null)
socket.Dispose();
var tcs = _currentReceiveTaskCompletionSource;
if (tcs != null)
{
tcs.TrySetCanceled();
}
var sendTcs = _currentSendTaskCompletionSource;
if (sendTcs != null)
{
sendTcs.TrySetCanceled();
}
} }
} }
public void Dispose()
{
if (_disposed)
{
return;
}
_socket?.Dispose();
_currentReceiveTaskCompletionSource?.TrySetCanceled();
_currentSendTaskCompletionSource?.TrySetCanceled();
_socket = null;
_currentReceiveTaskCompletionSource = null;
_currentSendTaskCompletionSource = null;
_disposed = true;
}
private static IpEndPointInfo ToIpEndPointInfo(IPEndPoint endpoint) private static IpEndPointInfo ToIpEndPointInfo(IPEndPoint endpoint)
{ {
if (endpoint == null) if (endpoint == null)

View File

@ -23,7 +23,6 @@ using MediaBrowser.Model.IO;
using MediaBrowser.Model.LiveTv; using MediaBrowser.Model.LiveTv;
using MediaBrowser.Model.Querying; using MediaBrowser.Model.Querying;
using MediaBrowser.Model.Services; using MediaBrowser.Model.Services;
using MediaBrowser.Model.System;
using Microsoft.Net.Http.Headers; using Microsoft.Net.Http.Headers;
namespace MediaBrowser.Api.LiveTv namespace MediaBrowser.Api.LiveTv
@ -695,27 +694,36 @@ namespace MediaBrowser.Api.LiveTv
private readonly IHttpClient _httpClient; private readonly IHttpClient _httpClient;
private readonly ILibraryManager _libraryManager; private readonly ILibraryManager _libraryManager;
private readonly IDtoService _dtoService; private readonly IDtoService _dtoService;
private readonly IFileSystem _fileSystem;
private readonly IAuthorizationContext _authContext; private readonly IAuthorizationContext _authContext;
private readonly ISessionContext _sessionContext; private readonly ISessionContext _sessionContext;
private ICryptoProvider _cryptographyProvider; private readonly ICryptoProvider _cryptographyProvider;
private IStreamHelper _streamHelper; private readonly IStreamHelper _streamHelper;
private IMediaSourceManager _mediaSourceManager; private readonly IMediaSourceManager _mediaSourceManager;
public LiveTvService(ICryptoProvider crypto, IMediaSourceManager mediaSourceManager, IStreamHelper streamHelper, ILiveTvManager liveTvManager, IUserManager userManager, IServerConfigurationManager config, IHttpClient httpClient, ILibraryManager libraryManager, IDtoService dtoService, IFileSystem fileSystem, IAuthorizationContext authContext, ISessionContext sessionContext) public LiveTvService(
ICryptoProvider crypto,
IMediaSourceManager mediaSourceManager,
IStreamHelper streamHelper,
ILiveTvManager liveTvManager,
IUserManager userManager,
IServerConfigurationManager config,
IHttpClient httpClient,
ILibraryManager libraryManager,
IDtoService dtoService,
IAuthorizationContext authContext,
ISessionContext sessionContext)
{ {
_cryptographyProvider = crypto;
_mediaSourceManager = mediaSourceManager;
_streamHelper = streamHelper;
_liveTvManager = liveTvManager; _liveTvManager = liveTvManager;
_userManager = userManager; _userManager = userManager;
_config = config; _config = config;
_httpClient = httpClient; _httpClient = httpClient;
_libraryManager = libraryManager; _libraryManager = libraryManager;
_dtoService = dtoService; _dtoService = dtoService;
_fileSystem = fileSystem;
_authContext = authContext; _authContext = authContext;
_sessionContext = sessionContext; _sessionContext = sessionContext;
_cryptographyProvider = crypto;
_streamHelper = streamHelper;
_mediaSourceManager = mediaSourceManager;
} }
public object Get(GetTunerHostTypes request) public object Get(GetTunerHostTypes request)
@ -729,7 +737,7 @@ namespace MediaBrowser.Api.LiveTv
var user = request.UserId.Equals(Guid.Empty) ? null : _userManager.GetUserById(request.UserId); var user = request.UserId.Equals(Guid.Empty) ? null : _userManager.GetUserById(request.UserId);
var folders = _liveTvManager.GetRecordingFolders(user); var folders = _liveTvManager.GetRecordingFolders(user);
var returnArray = _dtoService.GetBaseItemDtos(folders.ToArray(), new DtoOptions(), user); var returnArray = _dtoService.GetBaseItemDtos(folders, new DtoOptions(), user);
var result = new QueryResult<BaseItemDto> var result = new QueryResult<BaseItemDto>
{ {
@ -754,7 +762,7 @@ namespace MediaBrowser.Api.LiveTv
[HeaderNames.ContentType] = Model.Net.MimeTypes.GetMimeType(path) [HeaderNames.ContentType] = Model.Net.MimeTypes.GetMimeType(path)
}; };
return new ProgressiveFileCopier(_fileSystem, _streamHelper, path, outputHeaders, Logger) return new ProgressiveFileCopier(_streamHelper, path, outputHeaders, Logger)
{ {
AllowEndOfFile = false AllowEndOfFile = false
}; };

View File

@ -1,3 +1,4 @@
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Threading; using System.Threading;
@ -11,22 +12,17 @@ namespace MediaBrowser.Api.LiveTv
{ {
public class ProgressiveFileCopier : IAsyncStreamWriter, IHasHeaders public class ProgressiveFileCopier : IAsyncStreamWriter, IHasHeaders
{ {
private readonly IFileSystem _fileSystem;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly string _path; private readonly string _path;
private readonly Dictionary<string, string> _outputHeaders; private readonly Dictionary<string, string> _outputHeaders;
const int StreamCopyToBufferSize = 81920;
public long StartPosition { get; set; }
public bool AllowEndOfFile = true; public bool AllowEndOfFile = true;
private readonly IDirectStreamProvider _directStreamProvider; private readonly IDirectStreamProvider _directStreamProvider;
private IStreamHelper _streamHelper; private IStreamHelper _streamHelper;
public ProgressiveFileCopier(IFileSystem fileSystem, IStreamHelper streamHelper, string path, Dictionary<string, string> outputHeaders, ILogger logger) public ProgressiveFileCopier(IStreamHelper streamHelper, string path, Dictionary<string, string> outputHeaders, ILogger logger)
{ {
_fileSystem = fileSystem;
_path = path; _path = path;
_outputHeaders = outputHeaders; _outputHeaders = outputHeaders;
_logger = logger; _logger = logger;
@ -43,18 +39,6 @@ namespace MediaBrowser.Api.LiveTv
public IDictionary<string, string> Headers => _outputHeaders; public IDictionary<string, string> Headers => _outputHeaders;
private Stream GetInputStream(bool allowAsyncFileRead)
{
var fileOpenOptions = FileOpenOptions.SequentialScan;
if (allowAsyncFileRead)
{
fileOpenOptions |= FileOpenOptions.Asynchronous;
}
return _fileSystem.GetFileStream(_path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, fileOpenOptions);
}
public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken) public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken)
{ {
if (_directStreamProvider != null) if (_directStreamProvider != null)
@ -63,28 +47,23 @@ namespace MediaBrowser.Api.LiveTv
return; return;
} }
var eofCount = 0; var fileOptions = FileOptions.SequentialScan;
// use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
var allowAsyncFileRead = true; if (Environment.OSVersion.Platform != PlatformID.Win32NT)
using (var inputStream = GetInputStream(allowAsyncFileRead))
{ {
if (StartPosition > 0) fileOptions |= FileOptions.Asynchronous;
{ }
inputStream.Position = StartPosition;
}
using (var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, fileOptions))
{
var emptyReadLimit = AllowEndOfFile ? 20 : 100; var emptyReadLimit = AllowEndOfFile ? 20 : 100;
var eofCount = 0;
while (eofCount < emptyReadLimit) while (eofCount < emptyReadLimit)
{ {
int bytesRead; int bytesRead;
bytesRead = await _streamHelper.CopyToAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); bytesRead = await _streamHelper.CopyToAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.LogDebug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0) if (bytesRead == 0)
{ {
eofCount++; eofCount++;

View File

@ -1,64 +0,0 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
namespace MediaBrowser.Model.Net
{
public class HttpResponse : IDisposable
{
/// <summary>
/// Gets or sets the type of the content.
/// </summary>
/// <value>The type of the content.</value>
public string ContentType { get; set; }
/// <summary>
/// Gets or sets the response URL.
/// </summary>
/// <value>The response URL.</value>
public string ResponseUrl { get; set; }
/// <summary>
/// Gets or sets the content.
/// </summary>
/// <value>The content.</value>
public Stream Content { get; set; }
/// <summary>
/// Gets or sets the status code.
/// </summary>
/// <value>The status code.</value>
public HttpStatusCode StatusCode { get; set; }
/// <summary>
/// Gets or sets the length of the content.
/// </summary>
/// <value>The length of the content.</value>
public long? ContentLength { get; set; }
/// <summary>
/// Gets or sets the headers.
/// </summary>
/// <value>The headers.</value>
public Dictionary<string, string> Headers { get; set; }
private readonly IDisposable _disposable;
public HttpResponse(IDisposable disposable)
{
_disposable = disposable;
}
public HttpResponse()
{
}
public void Dispose()
{
if (_disposable != null)
{
_disposable.Dispose();
}
}
}
}

View File

@ -1,15 +0,0 @@
using System;
namespace MediaBrowser.Model.Net
{
public class SocketCreateException : Exception
{
public SocketCreateException(string errorCode, Exception originalException)
: base(errorCode, originalException)
{
ErrorCode = errorCode;
}
public string ErrorCode { get; private set; }
}
}