jellyfin/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs

153 lines
4.9 KiB
C#
Raw Normal View History

2016-10-07 15:08:13 +00:00
using System;
2017-02-01 20:55:56 +00:00
using System.Collections.Concurrent;
2016-10-07 15:08:13 +00:00
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.Logging;
2017-03-26 16:26:52 +00:00
using MediaBrowser.Model.Net;
2016-10-07 15:08:13 +00:00
2016-11-03 23:35:19 +00:00
namespace Emby.Server.Implementations.LiveTv.TunerHosts
2016-10-07 15:08:13 +00:00
{
public class MulticastStream
{
2017-02-01 20:55:56 +00:00
private readonly ConcurrentDictionary<Guid,QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
2016-10-07 15:08:13 +00:00
private const int BufferSize = 81920;
private CancellationToken _cancellationToken;
private readonly ILogger _logger;
public MulticastStream(ILogger logger)
{
_logger = logger;
}
public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
2016-11-23 06:54:09 +00:00
byte[] buffer = new byte[BufferSize];
2017-05-02 12:53:21 +00:00
if (source == null)
{
throw new ArgumentNullException("source");
}
2016-10-07 15:08:13 +00:00
while (!cancellationToken.IsCancellationRequested)
{
var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
if (bytesRead > 0)
{
2017-02-01 20:55:56 +00:00
var allStreams = _outputStreams.ToList();
2017-03-27 19:31:24 +00:00
2017-04-05 17:16:44 +00:00
//if (allStreams.Count == 1)
//{
// await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
//}
//else
2017-03-27 19:31:24 +00:00
{
byte[] copy = new byte[bytesRead];
Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead);
foreach (var stream in allStreams)
{
stream.Value.Queue(copy, 0, copy.Length);
}
2017-03-26 16:26:52 +00:00
}
if (onStarted != null)
{
var onStartedCopy = onStarted;
onStarted = null;
Task.Run(onStartedCopy);
}
}
else
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
private static int RtpHeaderBytes = 12;
public async Task CopyUntilCancelled(ISocket udpClient, Action onStarted, CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
while (!cancellationToken.IsCancellationRequested)
{
var receiveToken = cancellationToken;
// On the first connection attempt, put a timeout to avoid being stuck indefinitely in the event of failure
if (onStarted != null)
{
receiveToken = CancellationTokenSource.CreateLinkedTokenSource(new CancellationTokenSource(5000).Token, cancellationToken).Token;
}
var data = await udpClient.ReceiveAsync(receiveToken).ConfigureAwait(false);
var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
if (bytesRead > 0)
{
var allStreams = _outputStreams.ToList();
2017-03-27 19:31:24 +00:00
if (allStreams.Count == 1)
2017-03-26 16:26:52 +00:00
{
2017-03-27 19:31:24 +00:00
await allStreams[0].Value.WriteAsync(data.Buffer, 0, bytesRead).ConfigureAwait(false);
}
else
{
byte[] copy = new byte[bytesRead];
Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, copy, 0, bytesRead);
foreach (var stream in allStreams)
{
stream.Value.Queue(copy, 0, copy.Length);
}
2016-10-07 15:08:13 +00:00
}
if (onStarted != null)
{
var onStartedCopy = onStarted;
onStarted = null;
Task.Run(onStartedCopy);
}
}
else
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
public Task CopyToAsync(Stream stream)
{
var result = new QueueStream(stream, _logger)
{
OnFinished = OnFinished
};
2017-02-01 20:55:56 +00:00
_outputStreams.TryAdd(result.Id, result);
2016-10-07 15:08:13 +00:00
result.Start(_cancellationToken);
return result.TaskCompletion.Task;
}
public void RemoveOutputStream(QueueStream stream)
{
2017-02-01 20:55:56 +00:00
QueueStream removed;
_outputStreams.TryRemove(stream.Id, out removed);
2016-10-07 15:08:13 +00:00
}
private void OnFinished(QueueStream queueStream)
{
RemoveOutputStream(queueStream);
}
}
}