diff --git a/Jellyfin.Api/Controllers/AudioController.cs b/Jellyfin.Api/Controllers/AudioController.cs index 86577411f..e63868339 100644 --- a/Jellyfin.Api/Controllers/AudioController.cs +++ b/Jellyfin.Api/Controllers/AudioController.cs @@ -35,7 +35,6 @@ namespace Jellyfin.Api.Controllers private readonly IMediaSourceManager _mediaSourceManager; private readonly IServerConfigurationManager _serverConfigurationManager; private readonly IMediaEncoder _mediaEncoder; - private readonly IStreamHelper _streamHelper; private readonly IFileSystem _fileSystem; private readonly ISubtitleEncoder _subtitleEncoder; private readonly IConfiguration _configuration; @@ -55,7 +54,6 @@ namespace Jellyfin.Api.Controllers /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. - /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. @@ -70,7 +68,6 @@ namespace Jellyfin.Api.Controllers IMediaSourceManager mediaSourceManager, IServerConfigurationManager serverConfigurationManager, IMediaEncoder mediaEncoder, - IStreamHelper streamHelper, IFileSystem fileSystem, ISubtitleEncoder subtitleEncoder, IConfiguration configuration, @@ -85,7 +82,6 @@ namespace Jellyfin.Api.Controllers _mediaSourceManager = mediaSourceManager; _serverConfigurationManager = serverConfigurationManager; _mediaEncoder = mediaEncoder; - _streamHelper = streamHelper; _fileSystem = fileSystem; _subtitleEncoder = subtitleEncoder; _configuration = configuration; @@ -283,8 +279,11 @@ namespace Jellyfin.Api.Controllers { StreamingHelpers.AddDlnaHeaders(state, Response.Headers, true, startTimeTicks, Request, _dlnaManager); - // TODO AllowEndOfFile = false - await new ProgressiveFileCopier(_streamHelper, state.DirectStreamProvider).WriteToAsync(Response.Body, CancellationToken.None).ConfigureAwait(false); + await new ProgressiveFileCopier(state.DirectStreamProvider, null, _transcodingJobHelper, CancellationToken.None) + { + AllowEndOfFile = false + }.WriteToAsync(Response.Body, CancellationToken.None) + .ConfigureAwait(false); // TODO (moved from MediaBrowser.Api): Don't hardcode contentType return File(Response.Body, MimeTypes.GetMimeType("file.ts")!); @@ -319,8 +318,11 @@ namespace Jellyfin.Api.Controllers if (state.MediaSource.IsInfiniteStream) { - // TODO AllowEndOfFile = false - await new ProgressiveFileCopier(_streamHelper, state.MediaPath).WriteToAsync(Response.Body, CancellationToken.None).ConfigureAwait(false); + await new ProgressiveFileCopier(state.MediaPath, null, _transcodingJobHelper, CancellationToken.None) + { + AllowEndOfFile = false + }.WriteToAsync(Response.Body, CancellationToken.None) + .ConfigureAwait(false); return File(Response.Body, contentType); } @@ -339,7 +341,6 @@ namespace Jellyfin.Api.Controllers return await FileStreamResponseHelpers.GetTranscodedFile( state, isHeadRequest, - _streamHelper, this, _transcodingJobHelper, ffmpegCommandLineArguments, diff --git a/Jellyfin.Api/Controllers/LiveTvController.cs b/Jellyfin.Api/Controllers/LiveTvController.cs index bc5446510..9144d6f28 100644 --- a/Jellyfin.Api/Controllers/LiveTvController.cs +++ b/Jellyfin.Api/Controllers/LiveTvController.cs @@ -24,7 +24,6 @@ using MediaBrowser.Controller.LiveTv; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Dto; using MediaBrowser.Model.Entities; -using MediaBrowser.Model.IO; using MediaBrowser.Model.LiveTv; using MediaBrowser.Model.Net; using MediaBrowser.Model.Querying; @@ -45,9 +44,9 @@ namespace Jellyfin.Api.Controllers private readonly ILibraryManager _libraryManager; private readonly IDtoService _dtoService; private readonly ISessionContext _sessionContext; - private readonly IStreamHelper _streamHelper; private readonly IMediaSourceManager _mediaSourceManager; private readonly IConfigurationManager _configurationManager; + private readonly TranscodingJobHelper _transcodingJobHelper; /// /// Initializes a new instance of the class. @@ -58,9 +57,9 @@ namespace Jellyfin.Api.Controllers /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. - /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. + /// Instance of the class. public LiveTvController( ILiveTvManager liveTvManager, IUserManager userManager, @@ -68,9 +67,9 @@ namespace Jellyfin.Api.Controllers ILibraryManager libraryManager, IDtoService dtoService, ISessionContext sessionContext, - IStreamHelper streamHelper, IMediaSourceManager mediaSourceManager, - IConfigurationManager configurationManager) + IConfigurationManager configurationManager, + TranscodingJobHelper transcodingJobHelper) { _liveTvManager = liveTvManager; _userManager = userManager; @@ -78,9 +77,9 @@ namespace Jellyfin.Api.Controllers _libraryManager = libraryManager; _dtoService = dtoService; _sessionContext = sessionContext; - _streamHelper = streamHelper; _mediaSourceManager = mediaSourceManager; _configurationManager = configurationManager; + _transcodingJobHelper = transcodingJobHelper; } /// @@ -1187,7 +1186,9 @@ namespace Jellyfin.Api.Controllers } await using var memoryStream = new MemoryStream(); - await new ProgressiveFileCopier(_streamHelper, path).WriteToAsync(memoryStream, CancellationToken.None).ConfigureAwait(false); + await new ProgressiveFileCopier(path, null, _transcodingJobHelper, CancellationToken.None) + .WriteToAsync(memoryStream, CancellationToken.None) + .ConfigureAwait(false); return File(memoryStream, MimeTypes.GetMimeType(path)); } @@ -1214,7 +1215,9 @@ namespace Jellyfin.Api.Controllers } await using var memoryStream = new MemoryStream(); - await new ProgressiveFileCopier(_streamHelper, liveStreamInfo).WriteToAsync(memoryStream, CancellationToken.None).ConfigureAwait(false); + await new ProgressiveFileCopier(liveStreamInfo, null, _transcodingJobHelper, CancellationToken.None) + .WriteToAsync(memoryStream, CancellationToken.None) + .ConfigureAwait(false); return File(memoryStream, MimeTypes.GetMimeType("file." + container)); } diff --git a/Jellyfin.Api/Controllers/VideosController.cs b/Jellyfin.Api/Controllers/VideosController.cs index 5050c3d4f..0ce62186b 100644 --- a/Jellyfin.Api/Controllers/VideosController.cs +++ b/Jellyfin.Api/Controllers/VideosController.cs @@ -46,7 +46,6 @@ namespace Jellyfin.Api.Controllers private readonly IMediaSourceManager _mediaSourceManager; private readonly IServerConfigurationManager _serverConfigurationManager; private readonly IMediaEncoder _mediaEncoder; - private readonly IStreamHelper _streamHelper; private readonly IFileSystem _fileSystem; private readonly ISubtitleEncoder _subtitleEncoder; private readonly IConfiguration _configuration; @@ -67,7 +66,6 @@ namespace Jellyfin.Api.Controllers /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. - /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. @@ -83,7 +81,6 @@ namespace Jellyfin.Api.Controllers IMediaSourceManager mediaSourceManager, IServerConfigurationManager serverConfigurationManager, IMediaEncoder mediaEncoder, - IStreamHelper streamHelper, IFileSystem fileSystem, ISubtitleEncoder subtitleEncoder, IConfiguration configuration, @@ -99,7 +96,6 @@ namespace Jellyfin.Api.Controllers _mediaSourceManager = mediaSourceManager; _serverConfigurationManager = serverConfigurationManager; _mediaEncoder = mediaEncoder; - _streamHelper = streamHelper; _fileSystem = fileSystem; _subtitleEncoder = subtitleEncoder; _configuration = configuration; @@ -376,7 +372,7 @@ namespace Jellyfin.Api.Controllers [FromQuery] Dictionary streamOptions) { var isHeadRequest = Request.Method == System.Net.WebRequestMethods.Http.Head; - using var cancellationTokenSource = new CancellationTokenSource(); + var cancellationTokenSource = new CancellationTokenSource(); var streamingRequest = new StreamingRequestDto { Id = itemId, @@ -453,8 +449,11 @@ namespace Jellyfin.Api.Controllers { StreamingHelpers.AddDlnaHeaders(state, Response.Headers, true, startTimeTicks, Request, _dlnaManager); - // TODO AllowEndOfFile = false - await new ProgressiveFileCopier(_streamHelper, state.DirectStreamProvider).WriteToAsync(Response.Body, CancellationToken.None).ConfigureAwait(false); + await new ProgressiveFileCopier(state.DirectStreamProvider, null, _transcodingJobHelper, CancellationToken.None) + { + AllowEndOfFile = false + }.WriteToAsync(Response.Body, CancellationToken.None) + .ConfigureAwait(false); // TODO (moved from MediaBrowser.Api): Don't hardcode contentType return File(Response.Body, MimeTypes.GetMimeType("file.ts")!); @@ -489,8 +488,11 @@ namespace Jellyfin.Api.Controllers if (state.MediaSource.IsInfiniteStream) { - // TODO AllowEndOfFile = false - await new ProgressiveFileCopier(_streamHelper, state.MediaPath).WriteToAsync(Response.Body, CancellationToken.None).ConfigureAwait(false); + await new ProgressiveFileCopier(state.MediaPath, null, _transcodingJobHelper, CancellationToken.None) + { + AllowEndOfFile = false + }.WriteToAsync(Response.Body, CancellationToken.None) + .ConfigureAwait(false); return File(Response.Body, contentType); } @@ -509,7 +511,6 @@ namespace Jellyfin.Api.Controllers return await FileStreamResponseHelpers.GetTranscodedFile( state, isHeadRequest, - _streamHelper, this, _transcodingJobHelper, ffmpegCommandLineArguments, diff --git a/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs b/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs index 636f47f5f..96e90d38f 100644 --- a/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs +++ b/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs @@ -3,9 +3,9 @@ using System.IO; using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using Jellyfin.Api.Models.PlaybackDtos; using Jellyfin.Api.Models.StreamingDtos; using MediaBrowser.Controller.MediaEncoding; -using MediaBrowser.Model.IO; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.Net.Http.Headers; @@ -80,7 +80,6 @@ namespace Jellyfin.Api.Helpers /// /// The current . /// Whether the current request is a HTTP HEAD request so only the headers get returned. - /// Instance of the interface. /// The managing the response. /// The singleton. /// The command line arguments to start ffmpeg. @@ -91,7 +90,6 @@ namespace Jellyfin.Api.Helpers public static async Task GetTranscodedFile( StreamState state, bool isHeadRequest, - IStreamHelper streamHelper, ControllerBase controller, TranscodingJobHelper transcodingJobHelper, string ffmpegCommandLineArguments, @@ -116,18 +114,20 @@ namespace Jellyfin.Api.Helpers await transcodingLock.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false); try { + TranscodingJobDto? job; if (!File.Exists(outputPath)) { - await transcodingJobHelper.StartFfMpeg(state, outputPath, ffmpegCommandLineArguments, request, transcodingJobType, cancellationTokenSource).ConfigureAwait(false); + job = await transcodingJobHelper.StartFfMpeg(state, outputPath, ffmpegCommandLineArguments, request, transcodingJobType, cancellationTokenSource).ConfigureAwait(false); } else { - transcodingJobHelper.OnTranscodeBeginRequest(outputPath, TranscodingJobType.Progressive); + job = transcodingJobHelper.OnTranscodeBeginRequest(outputPath, TranscodingJobType.Progressive); state.Dispose(); } - await using var memoryStream = new MemoryStream(); - await new ProgressiveFileCopier(streamHelper, outputPath).WriteToAsync(memoryStream, CancellationToken.None).ConfigureAwait(false); + var memoryStream = new MemoryStream(); + await new ProgressiveFileCopier(outputPath, job, transcodingJobHelper, CancellationToken.None).WriteToAsync(memoryStream, CancellationToken.None).ConfigureAwait(false); + memoryStream.Position = 0; return controller.File(memoryStream, contentType); } finally diff --git a/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs b/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs index e8e6966f4..acaccc77a 100644 --- a/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs +++ b/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs @@ -2,6 +2,7 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using Jellyfin.Api.Models.PlaybackDtos; using MediaBrowser.Controller.Library; using MediaBrowser.Model.IO; @@ -12,34 +13,53 @@ namespace Jellyfin.Api.Helpers /// public class ProgressiveFileCopier { + private readonly TranscodingJobDto? _job; private readonly string? _path; + private readonly CancellationToken _cancellationToken; private readonly IDirectStreamProvider? _directStreamProvider; - private readonly IStreamHelper _streamHelper; + private readonly TranscodingJobHelper _transcodingJobHelper; + private long _bytesWritten; /// /// Initializes a new instance of the class. /// - /// Instance of the interface. - /// Filepath to stream from. - public ProgressiveFileCopier(IStreamHelper streamHelper, string path) + /// The path to copy from. + /// The transcoding job. + /// Instance of the . + /// The cancellation token. + public ProgressiveFileCopier(string path, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken) { _path = path; - _streamHelper = streamHelper; - _directStreamProvider = null; + _job = job; + _cancellationToken = cancellationToken; + _transcodingJobHelper = transcodingJobHelper; } /// /// Initializes a new instance of the class. /// - /// Instance of the interface. /// Instance of the interface. - public ProgressiveFileCopier(IStreamHelper streamHelper, IDirectStreamProvider directStreamProvider) + /// The transcoding job. + /// Instance of the . + /// The cancellation token. + public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken) { _directStreamProvider = directStreamProvider; - _streamHelper = streamHelper; - _path = null; + _job = job; + _cancellationToken = cancellationToken; + _transcodingJobHelper = transcodingJobHelper; } + /// + /// Gets or sets a value indicating whether allow read end of file. + /// + public bool AllowEndOfFile { get; set; } = true; + + /// + /// Gets or sets copy start position. + /// + public long StartPosition { get; set; } + /// /// Write source stream to output. /// @@ -48,37 +68,123 @@ namespace Jellyfin.Api.Helpers /// A . public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken) { - if (_directStreamProvider != null) + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken).Token; + + try { - await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false); - return; - } - - var fileOptions = FileOptions.SequentialScan; - - // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 - if (Environment.OSVersion.Platform != PlatformID.Win32NT) - { - fileOptions |= FileOptions.Asynchronous; - } - - await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, fileOptions); - const int emptyReadLimit = 100; - var eofCount = 0; - while (eofCount < emptyReadLimit) - { - var bytesRead = await _streamHelper.CopyToAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); - - if (bytesRead == 0) + if (_directStreamProvider != null) { - eofCount++; - await Task.Delay(100, cancellationToken).ConfigureAwait(false); + await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false); + return; } - else + + var fileOptions = FileOptions.SequentialScan; + var allowAsyncFileRead = false; + + // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 + if (Environment.OSVersion.Platform != PlatformID.Win32NT) { - eofCount = 0; + fileOptions |= FileOptions.Asynchronous; + allowAsyncFileRead = true; + } + + await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions); + + var eofCount = 0; + const int emptyReadLimit = 20; + if (StartPosition > 0) + { + inputStream.Position = StartPosition; + } + + while (eofCount < emptyReadLimit || !AllowEndOfFile) + { + int bytesRead; + if (allowAsyncFileRead) + { + bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); + } + else + { + bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false); + } + + if (bytesRead == 0) + { + if (_job == null || _job.HasExited) + { + eofCount++; + } + + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + } + finally + { + if (_job != null) + { + _transcodingJobHelper.OnTranscodeEndRequest(_job); } } } + + private async Task CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken) + { + var array = new byte[IODefaults.CopyToBufferSize]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = source.Read(array, 0, array.Length)) != 0) + { + var bytesToWrite = bytesRead; + + if (bytesToWrite > 0) + { + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } + } + } + + return totalBytesRead; + } + + private async Task CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken) + { + var array = new byte[IODefaults.CopyToBufferSize]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) + { + var bytesToWrite = bytesRead; + + if (bytesToWrite > 0) + { + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } + } + } + + return totalBytesRead; + } } } diff --git a/Jellyfin.Api/Helpers/TranscodingJobHelper.cs b/Jellyfin.Api/Helpers/TranscodingJobHelper.cs index c84135085..fc38eacaf 100644 --- a/Jellyfin.Api/Helpers/TranscodingJobHelper.cs +++ b/Jellyfin.Api/Helpers/TranscodingJobHelper.cs @@ -680,6 +680,20 @@ namespace Jellyfin.Api.Helpers } } + /// + /// Called when [transcode end]. + /// + /// The transcode job. + public void OnTranscodeEndRequest(TranscodingJobDto job) + { + job.ActiveRequestCount--; + _logger.LogDebug("OnTranscodeEndRequest job.ActiveRequestCount={ActiveRequestCount}", job.ActiveRequestCount); + if (job.ActiveRequestCount <= 0) + { + PingTimer(job, false); + } + } + /// /// /// The progressive