removed dead code
This commit is contained in:
parent
b6ec767ea0
commit
9a5a6f569d
|
@ -129,7 +129,6 @@
|
|||
<Compile Include="HttpServer\SocketSharp\WebSocketSharpResponse.cs" />
|
||||
<Compile Include="HttpServer\StreamWriter.cs" />
|
||||
<Compile Include="Images\BaseDynamicImageProvider.cs" />
|
||||
<Compile Include="IO\AsyncStreamCopier.cs" />
|
||||
<Compile Include="IO\FileRefresher.cs" />
|
||||
<Compile Include="IO\IsoManager.cs" />
|
||||
<Compile Include="IO\LibraryMonitor.cs" />
|
||||
|
|
|
@ -1,459 +0,0 @@
|
|||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Emby.Server.Implementations.IO
|
||||
{
|
||||
public class AsyncStreamCopier : IDisposable
|
||||
{
|
||||
// size in bytes of the buffers in the buffer pool
|
||||
private const int DefaultBufferSize = 81920;
|
||||
private readonly int _bufferSize;
|
||||
// number of buffers in the pool
|
||||
private const int DefaultBufferCount = 4;
|
||||
private readonly int _bufferCount;
|
||||
|
||||
// indexes of the next buffer to read into/write from
|
||||
private int _nextReadBuffer = -1;
|
||||
private int _nextWriteBuffer = -1;
|
||||
|
||||
// the buffer pool, implemented as an array, and used in a cyclic way
|
||||
private readonly byte[][] _buffers;
|
||||
// sizes in bytes of the available (read) data in the buffers
|
||||
private readonly int[] _sizes;
|
||||
// the streams...
|
||||
private Stream _source;
|
||||
private Stream _target;
|
||||
private readonly bool _closeStreamsOnEnd;
|
||||
|
||||
// number of buffers that are ready to be written
|
||||
private int _buffersToWrite;
|
||||
// flag indicating that there is still a read operation to be scheduled
|
||||
// (source end of stream not reached)
|
||||
private volatile bool _moreDataToRead;
|
||||
// the result of the whole operation, returned by BeginCopy()
|
||||
private AsyncResult _asyncResult;
|
||||
// any exception that occurs during an async operation
|
||||
// stored here for rethrow
|
||||
private Exception _exception;
|
||||
|
||||
public TaskCompletionSource<long> TaskCompletionSource;
|
||||
private long _bytesToRead;
|
||||
private long _totalBytesWritten;
|
||||
private CancellationToken _cancellationToken;
|
||||
public int IndividualReadOffset = 0;
|
||||
|
||||
public AsyncStreamCopier(Stream source,
|
||||
Stream target,
|
||||
long bytesToRead,
|
||||
CancellationToken cancellationToken,
|
||||
bool closeStreamsOnEnd = false,
|
||||
int bufferSize = DefaultBufferSize,
|
||||
int bufferCount = DefaultBufferCount)
|
||||
{
|
||||
if (source == null)
|
||||
throw new ArgumentNullException("source");
|
||||
if (target == null)
|
||||
throw new ArgumentNullException("target");
|
||||
if (!source.CanRead)
|
||||
throw new ArgumentException("Cannot copy from a non-readable stream.");
|
||||
if (!target.CanWrite)
|
||||
throw new ArgumentException("Cannot copy to a non-writable stream.");
|
||||
_source = source;
|
||||
_target = target;
|
||||
_moreDataToRead = true;
|
||||
_closeStreamsOnEnd = closeStreamsOnEnd;
|
||||
_bufferSize = bufferSize;
|
||||
_bufferCount = bufferCount;
|
||||
_buffers = new byte[_bufferCount][];
|
||||
_sizes = new int[_bufferCount];
|
||||
_bytesToRead = bytesToRead;
|
||||
_cancellationToken = cancellationToken;
|
||||
}
|
||||
|
||||
~AsyncStreamCopier()
|
||||
{
|
||||
// ensure any exception cannot be ignored
|
||||
ThrowExceptionIfNeeded();
|
||||
}
|
||||
|
||||
public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
|
||||
{
|
||||
return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);
|
||||
}
|
||||
|
||||
public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
|
||||
{
|
||||
var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);
|
||||
var taskCompletion = new TaskCompletionSource<long>();
|
||||
|
||||
copier.TaskCompletionSource = taskCompletion;
|
||||
|
||||
var result = copier.BeginCopy(StreamCopyCallback, copier);
|
||||
|
||||
if (result.CompletedSynchronously)
|
||||
{
|
||||
StreamCopyCallback(result);
|
||||
}
|
||||
|
||||
cancellationToken.Register(() => taskCompletion.TrySetCanceled());
|
||||
|
||||
return taskCompletion.Task;
|
||||
}
|
||||
|
||||
private static void StreamCopyCallback(IAsyncResult result)
|
||||
{
|
||||
var copier = (AsyncStreamCopier)result.AsyncState;
|
||||
var taskCompletion = copier.TaskCompletionSource;
|
||||
|
||||
try
|
||||
{
|
||||
copier.EndCopy(result);
|
||||
taskCompletion.TrySetResult(copier._totalBytesWritten);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
taskCompletion.TrySetException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_asyncResult != null)
|
||||
_asyncResult.Dispose();
|
||||
if (_closeStreamsOnEnd)
|
||||
{
|
||||
if (_source != null)
|
||||
{
|
||||
_source.Dispose();
|
||||
_source = null;
|
||||
}
|
||||
if (_target != null)
|
||||
{
|
||||
_target.Dispose();
|
||||
_target = null;
|
||||
}
|
||||
}
|
||||
GC.SuppressFinalize(this);
|
||||
ThrowExceptionIfNeeded();
|
||||
}
|
||||
|
||||
public IAsyncResult BeginCopy(AsyncCallback callback, object state)
|
||||
{
|
||||
// avoid concurrent start of the copy on separate threads
|
||||
if (Interlocked.CompareExchange(ref _asyncResult, new AsyncResult(callback, state), null) != null)
|
||||
throw new InvalidOperationException("A copy operation has already been started on this object.");
|
||||
// allocate buffers
|
||||
for (int i = 0; i < _bufferCount; i++)
|
||||
_buffers[i] = new byte[_bufferSize];
|
||||
|
||||
// we pass false to BeginRead() to avoid completing the async result
|
||||
// immediately which would result in invoking the callback
|
||||
// when the method fails synchronously
|
||||
BeginRead(false);
|
||||
// throw exception synchronously if there is one
|
||||
ThrowExceptionIfNeeded();
|
||||
return _asyncResult;
|
||||
}
|
||||
|
||||
public void EndCopy(IAsyncResult ar)
|
||||
{
|
||||
if (ar != _asyncResult)
|
||||
throw new InvalidOperationException("Invalid IAsyncResult object.");
|
||||
|
||||
if (!_asyncResult.IsCompleted)
|
||||
_asyncResult.AsyncWaitHandle.WaitOne();
|
||||
|
||||
if (_closeStreamsOnEnd)
|
||||
{
|
||||
_source.Close();
|
||||
_source = null;
|
||||
_target.Close();
|
||||
_target = null;
|
||||
}
|
||||
|
||||
//_logger.Info("AsyncStreamCopier {0} bytes requested. {1} bytes transferred", _bytesToRead, _totalBytesWritten);
|
||||
ThrowExceptionIfNeeded();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Here we'll throw a pending exception if there is one,
|
||||
/// and remove it from our instance, so we know it has been consumed.
|
||||
/// </summary>
|
||||
private void ThrowExceptionIfNeeded()
|
||||
{
|
||||
if (_exception != null)
|
||||
{
|
||||
var exception = _exception;
|
||||
_exception = null;
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
private void BeginRead(bool completeOnError = true)
|
||||
{
|
||||
if (!_moreDataToRead)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (_asyncResult.IsCompleted)
|
||||
return;
|
||||
int bufferIndex = Interlocked.Increment(ref _nextReadBuffer) % _bufferCount;
|
||||
|
||||
try
|
||||
{
|
||||
_source.BeginRead(_buffers[bufferIndex], 0, _bufferSize, EndRead, bufferIndex);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_exception = exception;
|
||||
if (completeOnError)
|
||||
_asyncResult.Complete(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void BeginWrite()
|
||||
{
|
||||
if (_asyncResult.IsCompleted)
|
||||
return;
|
||||
// this method can actually be called concurrently!!
|
||||
// indeed, let's say we call a BeginWrite, and the thread gets interrupted
|
||||
// just after making the IO request.
|
||||
// At that moment, the thread is still in the method. And then the IO request
|
||||
// ends (extremely fast io, or caching...), EndWrite gets called
|
||||
// on another thread, and calls BeginWrite again! There we have it!
|
||||
// That is the reason why an Interlocked is needed here.
|
||||
int bufferIndex = Interlocked.Increment(ref _nextWriteBuffer) % _bufferCount;
|
||||
|
||||
try
|
||||
{
|
||||
int bytesToWrite;
|
||||
if (_bytesToRead > 0)
|
||||
{
|
||||
var bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
|
||||
bytesToWrite = Convert.ToInt32(Math.Min(_sizes[bufferIndex], bytesLeftToWrite));
|
||||
}
|
||||
else
|
||||
{
|
||||
bytesToWrite = _sizes[bufferIndex];
|
||||
}
|
||||
|
||||
_target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);
|
||||
|
||||
_totalBytesWritten += bytesToWrite;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_exception = exception;
|
||||
_asyncResult.Complete(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void EndRead(IAsyncResult ar)
|
||||
{
|
||||
try
|
||||
{
|
||||
int read = _source.EndRead(ar);
|
||||
_moreDataToRead = read > 0;
|
||||
var bufferIndex = (int)ar.AsyncState;
|
||||
_sizes[bufferIndex] = read;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_exception = exception;
|
||||
_asyncResult.Complete(false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (_moreDataToRead && !_cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
int usedBuffers = Interlocked.Increment(ref _buffersToWrite);
|
||||
// if we incremented from zero to one, then it means we just
|
||||
// added the single buffer to write, so a writer could not
|
||||
// be busy, and we have to schedule one.
|
||||
if (usedBuffers == 1)
|
||||
BeginWrite();
|
||||
// test if there is at least a free buffer, and schedule
|
||||
// a read, as we have read some data
|
||||
if (usedBuffers < _bufferCount)
|
||||
BeginRead();
|
||||
}
|
||||
else
|
||||
{
|
||||
// we did not add a buffer, because no data was read, and
|
||||
// there is no buffer left to write so this is the end...
|
||||
if (Thread.VolatileRead(ref _buffersToWrite) == 0)
|
||||
{
|
||||
_asyncResult.Complete(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void EndWrite(IAsyncResult ar)
|
||||
{
|
||||
try
|
||||
{
|
||||
_target.EndWrite(ar);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_exception = exception;
|
||||
_asyncResult.Complete(false);
|
||||
return;
|
||||
}
|
||||
|
||||
int buffersLeftToWrite = Interlocked.Decrement(ref _buffersToWrite);
|
||||
// no reader could be active if all buffers were full of data waiting to be written
|
||||
bool noReaderIsBusy = buffersLeftToWrite == _bufferCount - 1;
|
||||
// note that it is possible that both a reader and
|
||||
// a writer see the end of the copy and call Complete
|
||||
// on the _asyncResult object. That race condition is handled by
|
||||
// Complete that ensures it is only executed fully once.
|
||||
|
||||
long bytesLeftToWrite;
|
||||
if (_bytesToRead > 0)
|
||||
{
|
||||
bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
|
||||
}
|
||||
else
|
||||
{
|
||||
bytesLeftToWrite = 1;
|
||||
}
|
||||
|
||||
if (!_moreDataToRead || bytesLeftToWrite <= 0 || _cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// at this point we know no reader can schedule a read or write
|
||||
if (Thread.VolatileRead(ref _buffersToWrite) == 0)
|
||||
{
|
||||
// nothing left to write, so it is the end
|
||||
_asyncResult.Complete(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
// here, we know we have something left to read,
|
||||
// so schedule a read if no read is busy
|
||||
if (noReaderIsBusy)
|
||||
BeginRead();
|
||||
|
||||
// also schedule a write if we are sure we did not write the last buffer
|
||||
// note that if buffersLeftToWrite is zero and a reader has put another
|
||||
// buffer to write between the time we decremented _buffersToWrite
|
||||
// and now, that reader will also schedule another write,
|
||||
// as it will increment _buffersToWrite from zero to one
|
||||
if (buffersLeftToWrite > 0)
|
||||
BeginWrite();
|
||||
}
|
||||
}
|
||||
|
||||
internal class AsyncResult : IAsyncResult, IDisposable
|
||||
{
|
||||
// Fields set at construction which never change while
|
||||
// operation is pending
|
||||
private readonly AsyncCallback _asyncCallback;
|
||||
private readonly object _asyncState;
|
||||
|
||||
// Fields set at construction which do change after
|
||||
// operation completes
|
||||
private const int StatePending = 0;
|
||||
private const int StateCompletedSynchronously = 1;
|
||||
private const int StateCompletedAsynchronously = 2;
|
||||
private int _completedState = StatePending;
|
||||
|
||||
// Field that may or may not get set depending on usage
|
||||
private ManualResetEvent _waitHandle;
|
||||
|
||||
internal AsyncResult(
|
||||
AsyncCallback asyncCallback,
|
||||
object state)
|
||||
{
|
||||
_asyncCallback = asyncCallback;
|
||||
_asyncState = state;
|
||||
}
|
||||
|
||||
internal bool Complete(bool completedSynchronously)
|
||||
{
|
||||
bool result = false;
|
||||
|
||||
// The _completedState field MUST be set prior calling the callback
|
||||
int prevState = Interlocked.CompareExchange(ref _completedState,
|
||||
completedSynchronously ? StateCompletedSynchronously :
|
||||
StateCompletedAsynchronously, StatePending);
|
||||
if (prevState == StatePending)
|
||||
{
|
||||
// If the event exists, set it
|
||||
if (_waitHandle != null)
|
||||
_waitHandle.Set();
|
||||
|
||||
if (_asyncCallback != null)
|
||||
_asyncCallback(this);
|
||||
|
||||
result = true;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
#region Implementation of IAsyncResult
|
||||
|
||||
public Object AsyncState { get { return _asyncState; } }
|
||||
|
||||
public bool CompletedSynchronously
|
||||
{
|
||||
get
|
||||
{
|
||||
return Thread.VolatileRead(ref _completedState) ==
|
||||
StateCompletedSynchronously;
|
||||
}
|
||||
}
|
||||
|
||||
public WaitHandle AsyncWaitHandle
|
||||
{
|
||||
get
|
||||
{
|
||||
if (_waitHandle == null)
|
||||
{
|
||||
bool done = IsCompleted;
|
||||
var mre = new ManualResetEvent(done);
|
||||
if (Interlocked.CompareExchange(ref _waitHandle,
|
||||
mre, null) != null)
|
||||
{
|
||||
// Another thread created this object's event; dispose
|
||||
// the event we just created
|
||||
mre.Close();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!done && IsCompleted)
|
||||
{
|
||||
// If the operation wasn't done when we created
|
||||
// the event but now it is done, set the event
|
||||
_waitHandle.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
return _waitHandle;
|
||||
}
|
||||
}
|
||||
|
||||
public bool IsCompleted
|
||||
{
|
||||
get
|
||||
{
|
||||
return Thread.VolatileRead(ref _completedState) !=
|
||||
StatePending;
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_waitHandle != null)
|
||||
{
|
||||
_waitHandle.Dispose();
|
||||
_waitHandle = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -158,58 +158,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
|
|||
|
||||
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_enableFileBuffer)
|
||||
{
|
||||
return CopyFileTo(_tempFilePath, stream, cancellationToken);
|
||||
}
|
||||
return _multicastStream.CopyToAsync(stream, cancellationToken);
|
||||
//return CopyFileTo(_tempFilePath, stream, cancellationToken);
|
||||
}
|
||||
|
||||
protected async Task CopyFileTo(string path, Stream outputStream, CancellationToken cancellationToken)
|
||||
{
|
||||
long startPosition = -20000;
|
||||
if (startPosition < 0)
|
||||
{
|
||||
var length = FileSystem.GetFileInfo(path).Length;
|
||||
startPosition = Math.Max(length - startPosition, 0);
|
||||
}
|
||||
|
||||
_logger.Info("Live stream starting position is {0} bytes", startPosition.ToString(CultureInfo.InvariantCulture));
|
||||
|
||||
var allowAsync = Environment.OperatingSystem != MediaBrowser.Model.System.OperatingSystem.Windows;
|
||||
// use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
|
||||
|
||||
using (var inputStream = GetInputStream(path, startPosition, allowAsync))
|
||||
{
|
||||
if (startPosition > 0)
|
||||
{
|
||||
inputStream.Position = startPosition;
|
||||
}
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
long bytesRead;
|
||||
|
||||
if (allowAsync)
|
||||
{
|
||||
bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 2, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken);
|
||||
bytesRead = 1;
|
||||
}
|
||||
|
||||
//var position = fs.Position;
|
||||
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
|
||||
|
||||
if (bytesRead == 0)
|
||||
{
|
||||
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,15 +211,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
|
|||
{
|
||||
long bytesRead;
|
||||
|
||||
if (allowAsync)
|
||||
{
|
||||
bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 2, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken);
|
||||
bytesRead = 1;
|
||||
}
|
||||
StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken);
|
||||
bytesRead = 1;
|
||||
|
||||
//var position = fs.Position;
|
||||
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
|
||||
|
@ -285,22 +278,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
|
|||
//return taskCompletion.Task;
|
||||
}
|
||||
|
||||
private void StreamCopyCallback(IAsyncResult result)
|
||||
{
|
||||
var copier = (AsyncStreamCopier)result.AsyncState;
|
||||
var taskCompletion = copier.TaskCompletionSource;
|
||||
|
||||
try
|
||||
{
|
||||
copier.EndCopy(result);
|
||||
taskCompletion.TrySetResult(0);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
taskCompletion.TrySetException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public class UdpClientStream : Stream
|
||||
{
|
||||
private static int RtpHeaderBytes = 12;
|
||||
|
|
Loading…
Reference in New Issue
Block a user