From f96e5c84a2cdf090b6f9097472e6e2332fbf97fe Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Thu, 1 Jun 2017 02:25:07 -0400 Subject: [PATCH] update live stream buffers --- Emby.Common.Implementations/Net/UdpSocket.cs | 5 + .../HdHomerun/HdHomerunHttpStream.cs | 29 ++- .../HdHomerun/HdHomerunUdpStream.cs | 172 +++++++++++++++++- .../LiveTv/TunerHosts/MulticastStream.cs | 2 +- .../LiveTv/TunerHosts/QueueStream.cs | 4 +- MediaBrowser.Model/Net/ISocket.cs | 2 + 6 files changed, 200 insertions(+), 14 deletions(-) diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs index 578610b4c..df1099d3d 100644 --- a/Emby.Common.Implementations/Net/UdpSocket.cs +++ b/Emby.Common.Implementations/Net/UdpSocket.cs @@ -128,6 +128,11 @@ namespace Emby.Common.Implementations.Net return _Socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer); } + public int Receive(byte[] buffer, int offset, int count) + { + return _Socket.Receive(buffer, 0, buffer.Length, SocketFlags.None); + } + public SocketReceiveResult EndReceive(IAsyncResult result) { IPEndPoint sender = new IPEndPoint(IPAddress.Any, 0); diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs index 3df9b85a8..90bbaaf3d 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs @@ -26,7 +26,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource(); private readonly TaskCompletionSource _liveStreamTaskCompletionSource = new TaskCompletionSource(); + private readonly MulticastStream _multicastStream; + private readonly string _tempFilePath; + private bool _enableFileBuffer = false; public HdHomerunHttpStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, IEnvironmentInfo environment) : base(mediaSource, environment, fileSystem) @@ -36,6 +39,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun _appHost = appHost; OriginalStreamId = originalStreamId; + _multicastStream = new MulticastStream(_logger); _tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts"); } @@ -103,13 +107,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun { _logger.Info("Beginning multicastStream.CopyUntilCancelled"); - FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath)); - using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None)) + if (_enableFileBuffer) { - //await response.Content.CopyToAsync(fileStream, 81920, cancellationToken).ConfigureAwait(false); - StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken); - - //await AsyncStreamCopier.CopyStream(response.Content, fileStream, 81920, 4, cancellationToken).ConfigureAwait(false); + FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath)); + using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None)) + { + StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken); + } + } + else + { + await _multicastStream.CopyUntilCancelled(response.Content, () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false); } } } @@ -134,7 +142,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun } _liveStreamTaskCompletionSource.TrySetResult(true); - await DeleteTempFile(_tempFilePath).ConfigureAwait(false); + //await DeleteTempFile(_tempFilePath).ConfigureAwait(false); }); } @@ -148,7 +156,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun public Task CopyToAsync(Stream stream, CancellationToken cancellationToken) { - return CopyFileTo(_tempFilePath, stream, cancellationToken); + if (_enableFileBuffer) + { + return CopyFileTo(_tempFilePath, stream, cancellationToken); + } + return _multicastStream.CopyToAsync(stream, cancellationToken); + //return CopyFileTo(_tempFilePath, stream, cancellationToken); } protected async Task CopyFileTo(string path, Stream outputStream, CancellationToken cancellationToken) diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index 4b958c07b..5ad6e2e16 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -34,6 +34,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun private readonly INetworkManager _networkManager; private readonly string _tempFilePath; + private bool _enableFileBuffer = false; + private readonly MulticastStream _multicastStream; public HdHomerunUdpStream(MediaSourceInfo mediaSource, string originalStreamId, IHdHomerunChannelCommands channelCommands, int numTuners, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, ISocketFactory socketFactory, INetworkManager networkManager, IEnvironmentInfo environment) : base(mediaSource, environment, fileSystem) @@ -46,6 +48,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun _channelCommands = channelCommands; _numTuners = numTuners; _tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts"); + _multicastStream = new MulticastStream(_logger); } protected override async Task OpenInternal(CancellationToken openCancellationToken) @@ -123,10 +126,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun if (!cancellationToken.IsCancellationRequested) { - FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath)); - using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None)) + if (_enableFileBuffer) { - CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken); + FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath)); + using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None)) + { + CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken); + } + } + else + { + await _multicastStream.CopyUntilCancelled(new UdpClientStream(udpClient), () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false); } } } @@ -170,6 +180,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun public async Task CopyToAsync(Stream outputStream, CancellationToken cancellationToken) { + if (!_enableFileBuffer) + { + await _multicastStream.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false); + return; + } + var path = _tempFilePath; long startPosition = -20000; @@ -285,5 +301,155 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun } } + public class UdpClientStream : Stream + { + private static int RtpHeaderBytes = 12; + private static int PacketSize = 1316; + private readonly ISocket _udpClient; + bool disposed; + + public UdpClientStream(ISocket udpClient) : base() + { + _udpClient = udpClient; + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (buffer == null) + throw new ArgumentNullException("buffer"); + + if (offset + count < 0) + throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count"); + + if (offset + count > buffer.Length) + throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count"); + + if (disposed) + throw new ObjectDisposedException(typeof(UdpClientStream).ToString()); + + // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize) + // The RTP header will be stripped so see how many reads we need to make to fill the buffer. + int numReads = count / PacketSize; + int totalBytesRead = 0; + byte[] receiveBuffer = new byte[81920]; + + for (int i = 0; i < numReads; ++i) + { + var data = await _udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); + + var bytesRead = data.ReceivedBytes - RtpHeaderBytes; + + // remove rtp header + Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead); + offset += bytesRead; + totalBytesRead += bytesRead; + } + return totalBytesRead; + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (buffer == null) + throw new ArgumentNullException("buffer"); + + if (offset + count < 0) + throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count"); + + if (offset + count > buffer.Length) + throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count"); + + if (disposed) + throw new ObjectDisposedException(typeof(UdpClientStream).ToString()); + + // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize) + // The RTP header will be stripped so see how many reads we need to make to fill the buffer. + int numReads = count / PacketSize; + int totalBytesRead = 0; + byte[] receiveBuffer = new byte[81920]; + + for (int i = 0; i < numReads; ++i) + { + var receivedBytes = _udpClient.Receive(receiveBuffer, 0, receiveBuffer.Length); + + var bytesRead = receivedBytes - RtpHeaderBytes; + + // remove rtp header + Buffer.BlockCopy(receiveBuffer, RtpHeaderBytes, buffer, offset, bytesRead); + offset += bytesRead; + totalBytesRead += bytesRead; + } + return totalBytesRead; + } + + protected override void Dispose(bool disposing) + { + disposed = true; + } + + public override bool CanRead + { + get + { + throw new NotImplementedException(); + } + } + + public override bool CanSeek + { + get + { + throw new NotImplementedException(); + } + } + + public override bool CanWrite + { + get + { + throw new NotImplementedException(); + } + } + + public override long Length + { + get + { + throw new NotImplementedException(); + } + } + + public override long Position + { + get + { + throw new NotImplementedException(); + } + + set + { + throw new NotImplementedException(); + } + } + + public override void Flush() + { + throw new NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + } } } \ No newline at end of file diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs index d4fcd7780..cf50e6092 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs @@ -43,7 +43,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts //if (allStreams.Count == 1) //{ - // await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false); + // allStreams[0].Value.Write(buffer, 0, bytesRead); //} //else { diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs index 19c711172..61bc390b4 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs @@ -57,13 +57,13 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts } } - public async Task WriteAsync(byte[] bytes, int offset, int count, CancellationToken cancellationToken) + public void Write(byte[] bytes, int offset, int count) { //return _outputStream.WriteAsync(bytes, offset, count, cancellationToken); try { - await _outputStream.WriteAsync(bytes, offset, count, cancellationToken).ConfigureAwait(false); + _outputStream.Write(bytes, offset, count); } catch (OperationCanceledException) { diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs index 7ad08f106..71eb9914b 100644 --- a/MediaBrowser.Model/Net/ISocket.cs +++ b/MediaBrowser.Model/Net/ISocket.cs @@ -16,6 +16,8 @@ namespace MediaBrowser.Model.Net Task ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken); + int Receive(byte[] buffer, int offset, int count); + IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback); SocketReceiveResult EndReceive(IAsyncResult result);