update MulticastStream

This commit is contained in:
Luke Pulverenti 2017-06-01 01:42:49 -04:00
parent 386ed8d34a
commit 7e609b8fc5
2 changed files with 11 additions and 12 deletions

View File

@ -13,7 +13,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{ {
public class MulticastStream public class MulticastStream
{ {
private readonly ConcurrentDictionary<Guid,QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>(); private readonly ConcurrentDictionary<Guid, QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
private const int BufferSize = 81920; private const int BufferSize = 81920;
private readonly ILogger _logger; private readonly ILogger _logger;
@ -31,9 +31,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
throw new ArgumentNullException("source"); throw new ArgumentNullException("source");
} }
while (!cancellationToken.IsCancellationRequested) while (true)
{ {
var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested();
var bytesRead = source.Read(buffer, 0, buffer.Length);
if (bytesRead > 0) if (bytesRead > 0)
{ {

View File

@ -14,7 +14,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{ {
private readonly Stream _outputStream; private readonly Stream _outputStream;
private readonly ConcurrentQueue<Tuple<byte[], int, int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>(); private readonly ConcurrentQueue<Tuple<byte[], int, int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>();
private CancellationToken _cancellationToken;
public TaskCompletionSource<bool> TaskCompletion { get; private set; } public TaskCompletionSource<bool> TaskCompletion { get; private set; }
public Action<QueueStream> OnFinished { get; set; } public Action<QueueStream> OnFinished { get; set; }
@ -35,8 +34,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public void Start(CancellationToken cancellationToken) public void Start(CancellationToken cancellationToken)
{ {
_cancellationToken = cancellationToken; Task.Run(() => StartInternal(cancellationToken));
Task.Run(() => StartInternal());
} }
private Tuple<byte[], int, int> Dequeue() private Tuple<byte[], int, int> Dequeue()
@ -59,10 +57,9 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
} }
} }
public async Task WriteAsync(byte[] bytes, int offset, int count) public async Task WriteAsync(byte[] bytes, int offset, int count, CancellationToken cancellationToken)
{ {
//return _outputStream.WriteAsync(bytes, offset, count, cancellationToken); //return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
var cancellationToken = _cancellationToken;
try try
{ {
@ -82,18 +79,18 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
} }
} }
private async Task StartInternal() private async Task StartInternal(CancellationToken cancellationToken)
{ {
var cancellationToken = _cancellationToken;
try try
{ {
while (true) while (true)
{ {
cancellationToken.ThrowIfCancellationRequested();
var result = Dequeue(); var result = Dequeue();
if (result != null) if (result != null)
{ {
await _outputStream.WriteAsync(result.Item1, result.Item2, result.Item3, cancellationToken).ConfigureAwait(false); _outputStream.Write(result.Item1, result.Item2, result.Item3);
} }
else else
{ {