Merge pull request #2684 from MediaBrowser/dev

Dev
This commit is contained in:
Luke 2017-06-01 03:56:03 -04:00 committed by GitHub
commit b54b7871e3
7 changed files with 216 additions and 42 deletions

View File

@ -128,6 +128,11 @@ namespace Emby.Common.Implementations.Net
return _Socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer);
}
public int Receive(byte[] buffer, int offset, int count)
{
return _Socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
}
public SocketReceiveResult EndReceive(IAsyncResult result)
{
IPEndPoint sender = new IPEndPoint(IPAddress.Any, 0);

View File

@ -94,17 +94,13 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
}
private const int BufferSize = 81920;
public static Task CopyUntilCancelled(Stream source, Stream target, CancellationToken cancellationToken)
{
return CopyUntilCancelled(source, target, null, cancellationToken);
}
public static async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken)
public static async Task CopyUntilCancelled(Stream source, Stream target, CancellationToken cancellationToken)
{
byte[] buffer = new byte[BufferSize];
while (!cancellationToken.IsCancellationRequested)
{
var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false);
onStarted = null;
var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
@ -116,23 +112,16 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
}
}
private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken)
private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken)
{
byte[] buffer = new byte[bufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
destination.Write(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
if (onStarted != null)
{
onStarted();
}
onStarted = null;
}
return totalBytesRead;

View File

@ -26,7 +26,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource();
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
private readonly MulticastStream _multicastStream;
private readonly string _tempFilePath;
private bool _enableFileBuffer = false;
public HdHomerunHttpStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@ -36,6 +39,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_appHost = appHost;
OriginalStreamId = originalStreamId;
_multicastStream = new MulticastStream(_logger);
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
}
@ -103,13 +107,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
_logger.Info("Beginning multicastStream.CopyUntilCancelled");
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
if (_enableFileBuffer)
{
//await response.Content.CopyToAsync(fileStream, 81920, cancellationToken).ConfigureAwait(false);
StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken);
//await AsyncStreamCopier.CopyStream(response.Content, fileStream, 81920, 4, cancellationToken).ConfigureAwait(false);
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
{
StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken);
}
}
else
{
await _multicastStream.CopyUntilCancelled(response.Content, () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@ -134,7 +142,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
_liveStreamTaskCompletionSource.TrySetResult(true);
await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
//await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
});
}
@ -148,7 +156,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
return CopyFileTo(_tempFilePath, stream, cancellationToken);
if (_enableFileBuffer)
{
return CopyFileTo(_tempFilePath, stream, cancellationToken);
}
return _multicastStream.CopyToAsync(stream, cancellationToken);
//return CopyFileTo(_tempFilePath, stream, cancellationToken);
}
protected async Task CopyFileTo(string path, Stream outputStream, CancellationToken cancellationToken)

View File

@ -34,6 +34,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly INetworkManager _networkManager;
private readonly string _tempFilePath;
private bool _enableFileBuffer = false;
private readonly MulticastStream _multicastStream;
public HdHomerunUdpStream(MediaSourceInfo mediaSource, string originalStreamId, IHdHomerunChannelCommands channelCommands, int numTuners, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, ISocketFactory socketFactory, INetworkManager networkManager, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@ -46,6 +48,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_channelCommands = channelCommands;
_numTuners = numTuners;
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
_multicastStream = new MulticastStream(_logger);
}
protected override async Task OpenInternal(CancellationToken openCancellationToken)
@ -123,10 +126,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
if (!cancellationToken.IsCancellationRequested)
{
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
if (_enableFileBuffer)
{
CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
{
CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
}
}
else
{
await _multicastStream.CopyUntilCancelled(new UdpClientStream(udpClient), () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@ -170,6 +180,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
public async Task CopyToAsync(Stream outputStream, CancellationToken cancellationToken)
{
if (!_enableFileBuffer)
{
await _multicastStream.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
return;
}
var path = _tempFilePath;
long startPosition = -20000;
@ -285,5 +301,155 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
}
public class UdpClientStream : Stream
{
private static int RtpHeaderBytes = 12;
private static int PacketSize = 1316;
private readonly ISocket _udpClient;
bool disposed;
public UdpClientStream(ISocket udpClient) : base()
{
_udpClient = udpClient;
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset + count < 0)
throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
if (offset + count > buffer.Length)
throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
if (disposed)
throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
// This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
// The RTP header will be stripped so see how many reads we need to make to fill the buffer.
int numReads = count / PacketSize;
int totalBytesRead = 0;
byte[] receiveBuffer = new byte[81920];
for (int i = 0; i < numReads; ++i)
{
var data = await _udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
// remove rtp header
Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead);
offset += bytesRead;
totalBytesRead += bytesRead;
}
return totalBytesRead;
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset + count < 0)
throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
if (offset + count > buffer.Length)
throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
if (disposed)
throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
// This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
// The RTP header will be stripped so see how many reads we need to make to fill the buffer.
int numReads = count / PacketSize;
int totalBytesRead = 0;
byte[] receiveBuffer = new byte[81920];
for (int i = 0; i < numReads; ++i)
{
var receivedBytes = _udpClient.Receive(receiveBuffer, 0, receiveBuffer.Length);
var bytesRead = receivedBytes - RtpHeaderBytes;
// remove rtp header
Buffer.BlockCopy(receiveBuffer, RtpHeaderBytes, buffer, offset, bytesRead);
offset += bytesRead;
totalBytesRead += bytesRead;
}
return totalBytesRead;
}
protected override void Dispose(bool disposing)
{
disposed = true;
}
public override bool CanRead
{
get
{
throw new NotImplementedException();
}
}
public override bool CanSeek
{
get
{
throw new NotImplementedException();
}
}
public override bool CanWrite
{
get
{
throw new NotImplementedException();
}
}
public override long Length
{
get
{
throw new NotImplementedException();
}
}
public override long Position
{
get
{
throw new NotImplementedException();
}
set
{
throw new NotImplementedException();
}
}
public override void Flush()
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
}
}

View File

@ -13,7 +13,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
public class MulticastStream
{
private readonly ConcurrentDictionary<Guid,QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
private readonly ConcurrentDictionary<Guid, QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
private const int BufferSize = 81920;
private readonly ILogger _logger;
@ -31,9 +31,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
throw new ArgumentNullException("source");
}
while (!cancellationToken.IsCancellationRequested)
while (true)
{
var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
var bytesRead = source.Read(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
@ -41,7 +43,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
//if (allStreams.Count == 1)
//{
// await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
// allStreams[0].Value.Write(buffer, 0, bytesRead);
//}
//else
{

View File

@ -14,7 +14,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
private readonly Stream _outputStream;
private readonly ConcurrentQueue<Tuple<byte[], int, int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>();
private CancellationToken _cancellationToken;
public TaskCompletionSource<bool> TaskCompletion { get; private set; }
public Action<QueueStream> OnFinished { get; set; }
@ -35,8 +34,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public void Start(CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
Task.Run(() => StartInternal());
Task.Run(() => StartInternal(cancellationToken));
}
private Tuple<byte[], int, int> Dequeue()
@ -59,14 +57,13 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
}
public async Task WriteAsync(byte[] bytes, int offset, int count)
public void Write(byte[] bytes, int offset, int count)
{
//return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
var cancellationToken = _cancellationToken;
try
{
await _outputStream.WriteAsync(bytes, offset, count, cancellationToken).ConfigureAwait(false);
_outputStream.Write(bytes, offset, count);
}
catch (OperationCanceledException)
{
@ -82,18 +79,18 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
}
private async Task StartInternal()
private async Task StartInternal(CancellationToken cancellationToken)
{
var cancellationToken = _cancellationToken;
try
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var result = Dequeue();
if (result != null)
{
await _outputStream.WriteAsync(result.Item1, result.Item2, result.Item3, cancellationToken).ConfigureAwait(false);
_outputStream.Write(result.Item1, result.Item2, result.Item3);
}
else
{

View File

@ -16,6 +16,8 @@ namespace MediaBrowser.Model.Net
Task<SocketReceiveResult> ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
int Receive(byte[] buffer, int offset, int count);
IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback);
SocketReceiveResult EndReceive(IAsyncResult result);