using System; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.IO; using System.Linq; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Jellyfin.Api.Models.PlaybackDtos; using Jellyfin.Api.Models.StreamingDtos; using Jellyfin.Data.Enums; using MediaBrowser.Common.Configuration; using MediaBrowser.Common.Extensions; using MediaBrowser.Controller.Configuration; using MediaBrowser.Controller.Library; using MediaBrowser.Controller.MediaEncoding; using MediaBrowser.Controller.Net; using MediaBrowser.Controller.Session; using MediaBrowser.Model.Entities; using MediaBrowser.Model.IO; using MediaBrowser.Model.MediaInfo; using MediaBrowser.Model.Session; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; namespace Jellyfin.Api.Helpers { /// /// Transcoding job helpers. /// public class TranscodingJobHelper : IDisposable { /// /// The active transcoding jobs. /// private static readonly List _activeTranscodingJobs = new List(); /// /// The transcoding locks. /// private static readonly Dictionary _transcodingLocks = new Dictionary(); private readonly IAuthorizationContext _authorizationContext; private readonly EncodingHelper _encodingHelper; private readonly IFileSystem _fileSystem; private readonly IIsoManager _isoManager; private readonly ILogger _logger; private readonly IMediaEncoder _mediaEncoder; private readonly IMediaSourceManager _mediaSourceManager; private readonly IServerConfigurationManager _serverConfigurationManager; private readonly ISessionManager _sessionManager; private readonly ILoggerFactory _loggerFactory; /// /// Initializes a new instance of the class. /// /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. /// Instance of the interface. public TranscodingJobHelper( ILogger logger, IMediaSourceManager mediaSourceManager, IFileSystem fileSystem, IMediaEncoder mediaEncoder, IServerConfigurationManager serverConfigurationManager, ISessionManager sessionManager, IAuthorizationContext authorizationContext, IIsoManager isoManager, ISubtitleEncoder subtitleEncoder, IConfiguration configuration, ILoggerFactory loggerFactory) { _logger = logger; _mediaSourceManager = mediaSourceManager; _fileSystem = fileSystem; _mediaEncoder = mediaEncoder; _serverConfigurationManager = serverConfigurationManager; _sessionManager = sessionManager; _authorizationContext = authorizationContext; _isoManager = isoManager; _loggerFactory = loggerFactory; _encodingHelper = new EncodingHelper(mediaEncoder, fileSystem, subtitleEncoder, configuration); DeleteEncodedMediaCache(); sessionManager!.PlaybackProgress += OnPlaybackProgress; sessionManager!.PlaybackStart += OnPlaybackProgress; } /// /// Get transcoding job. /// /// Playback session id. /// The transcoding job. public TranscodingJobDto? GetTranscodingJob(string playSessionId) { lock (_activeTranscodingJobs) { return _activeTranscodingJobs.FirstOrDefault(j => string.Equals(j.PlaySessionId, playSessionId, StringComparison.OrdinalIgnoreCase)); } } /// /// Get transcoding job. /// /// Path to the transcoding file. /// The . /// The transcoding job. public TranscodingJobDto? GetTranscodingJob(string path, TranscodingJobType type) { lock (_activeTranscodingJobs) { return _activeTranscodingJobs.FirstOrDefault(j => j.Type == type && string.Equals(j.Path, path, StringComparison.OrdinalIgnoreCase)); } } /// /// Ping transcoding job. /// /// Play session id. /// Is user paused. /// Play session id is null. public void PingTranscodingJob(string playSessionId, bool? isUserPaused) { if (string.IsNullOrEmpty(playSessionId)) { throw new ArgumentNullException(nameof(playSessionId)); } _logger.LogDebug("PingTranscodingJob PlaySessionId={0} isUsedPaused: {1}", playSessionId, isUserPaused); List jobs; lock (_activeTranscodingJobs) { // This is really only needed for HLS. // Progressive streams can stop on their own reliably. jobs = _activeTranscodingJobs.Where(j => string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase)).ToList(); } foreach (var job in jobs) { if (isUserPaused.HasValue) { _logger.LogDebug("Setting job.IsUserPaused to {0}. jobId: {1}", isUserPaused, job.Id); job.IsUserPaused = isUserPaused.Value; } PingTimer(job, true); } } private void PingTimer(TranscodingJobDto job, bool isProgressCheckIn) { if (job.HasExited) { job.StopKillTimer(); return; } var timerDuration = 10000; if (job.Type != TranscodingJobType.Progressive) { timerDuration = 60000; } job.PingTimeout = timerDuration; job.LastPingDate = DateTime.UtcNow; // Don't start the timer for playback checkins with progressive streaming if (job.Type != TranscodingJobType.Progressive || !isProgressCheckIn) { job.StartKillTimer(OnTranscodeKillTimerStopped); } else { job.ChangeKillTimerIfStarted(); } } /// /// Called when [transcode kill timer stopped]. /// /// The state. private async void OnTranscodeKillTimerStopped(object? state) { var job = state as TranscodingJobDto ?? throw new ArgumentException($"{nameof(state)} is not of type {nameof(TranscodingJobDto)}", nameof(state)); if (!job.HasExited && job.Type != TranscodingJobType.Progressive) { var timeSinceLastPing = (DateTime.UtcNow - job.LastPingDate).TotalMilliseconds; if (timeSinceLastPing < job.PingTimeout) { job.StartKillTimer(OnTranscodeKillTimerStopped, job.PingTimeout); return; } } _logger.LogInformation("Transcoding kill timer stopped for JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId); await KillTranscodingJob(job, true, path => true).ConfigureAwait(false); } /// /// Kills the single transcoding job. /// /// The device id. /// The play session identifier. /// The delete files. /// Task. public Task KillTranscodingJobs(string deviceId, string? playSessionId, Func deleteFiles) { return KillTranscodingJobs( j => string.IsNullOrWhiteSpace(playSessionId) ? string.Equals(deviceId, j.DeviceId, StringComparison.OrdinalIgnoreCase) : string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase), deleteFiles); } /// /// Kills the transcoding jobs. /// /// The kill job. /// The delete files. /// Task. private Task KillTranscodingJobs(Func killJob, Func deleteFiles) { var jobs = new List(); lock (_activeTranscodingJobs) { // This is really only needed for HLS. // Progressive streams can stop on their own reliably. jobs.AddRange(_activeTranscodingJobs.Where(killJob)); } if (jobs.Count == 0) { return Task.CompletedTask; } IEnumerable GetKillJobs() { foreach (var job in jobs) { yield return KillTranscodingJob(job, false, deleteFiles); } } return Task.WhenAll(GetKillJobs()); } /// /// Kills the transcoding job. /// /// The job. /// if set to true [close live stream]. /// The delete. private async Task KillTranscodingJob(TranscodingJobDto job, bool closeLiveStream, Func delete) { job.DisposeKillTimer(); _logger.LogDebug("KillTranscodingJob - JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId); lock (_activeTranscodingJobs) { _activeTranscodingJobs.Remove(job); if (!job.CancellationTokenSource!.IsCancellationRequested) { job.CancellationTokenSource.Cancel(); } } lock (_transcodingLocks) { _transcodingLocks.Remove(job.Path!); } lock (job.ProcessLock!) { job.TranscodingThrottler?.Stop().GetAwaiter().GetResult(); var process = job.Process; var hasExited = job.HasExited; if (!hasExited) { try { _logger.LogInformation("Stopping ffmpeg process with q command for {Path}", job.Path); process!.StandardInput.WriteLine("q"); // Need to wait because killing is asynchronous. if (!process.WaitForExit(5000)) { _logger.LogInformation("Killing FFmpeg process for {Path}", job.Path); process.Kill(); } } catch (InvalidOperationException) { } } } if (delete(job.Path!)) { await DeletePartialStreamFiles(job.Path!, job.Type, 0, 1500).ConfigureAwait(false); } if (closeLiveStream && !string.IsNullOrWhiteSpace(job.LiveStreamId)) { try { await _mediaSourceManager.CloseLiveStream(job.LiveStreamId).ConfigureAwait(false); } catch (Exception ex) { _logger.LogError(ex, "Error closing live stream for {Path}", job.Path); } } } private async Task DeletePartialStreamFiles(string path, TranscodingJobType jobType, int retryCount, int delayMs) { if (retryCount >= 10) { return; } _logger.LogInformation("Deleting partial stream file(s) {Path}", path); await Task.Delay(delayMs).ConfigureAwait(false); try { if (jobType == TranscodingJobType.Progressive) { DeleteProgressivePartialStreamFiles(path); } else { DeleteHlsPartialStreamFiles(path); } } catch (IOException ex) { _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path); await DeletePartialStreamFiles(path, jobType, retryCount + 1, 500).ConfigureAwait(false); } catch (Exception ex) { _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path); } } /// /// Deletes the progressive partial stream files. /// /// The output file path. private void DeleteProgressivePartialStreamFiles(string outputFilePath) { if (File.Exists(outputFilePath)) { _fileSystem.DeleteFile(outputFilePath); } } /// /// Deletes the HLS partial stream files. /// /// The output file path. private void DeleteHlsPartialStreamFiles(string outputFilePath) { var directory = Path.GetDirectoryName(outputFilePath); var name = Path.GetFileNameWithoutExtension(outputFilePath); var filesToDelete = _fileSystem.GetFilePaths(directory) .Where(f => f.IndexOf(name, StringComparison.OrdinalIgnoreCase) != -1); List? exs = null; foreach (var file in filesToDelete) { try { _logger.LogDebug("Deleting HLS file {0}", file); _fileSystem.DeleteFile(file); } catch (IOException ex) { (exs ??= new List(4)).Add(ex); _logger.LogError(ex, "Error deleting HLS file {Path}", file); } } if (exs != null) { throw new AggregateException("Error deleting HLS files", exs); } } /// /// Report the transcoding progress to the session manager. /// /// The of which the progress will be reported. /// The of the current transcoding job. /// The current transcoding position. /// The framerate of the transcoding job. /// The completion percentage of the transcode. /// The number of bytes transcoded. /// The bitrate of the transcoding job. public void ReportTranscodingProgress( TranscodingJobDto job, StreamState state, TimeSpan? transcodingPosition, float? framerate, double? percentComplete, long? bytesTranscoded, int? bitRate) { var ticks = transcodingPosition?.Ticks; if (job != null) { job.Framerate = framerate; job.CompletionPercentage = percentComplete; job.TranscodingPositionTicks = ticks; job.BytesTranscoded = bytesTranscoded; job.BitRate = bitRate; } var deviceId = state.Request.DeviceId; if (!string.IsNullOrWhiteSpace(deviceId)) { var audioCodec = state.ActualOutputAudioCodec; var videoCodec = state.ActualOutputVideoCodec; _sessionManager.ReportTranscodingInfo(deviceId, new TranscodingInfo { Bitrate = bitRate ?? state.TotalOutputBitrate, AudioCodec = audioCodec, VideoCodec = videoCodec, Container = state.OutputContainer, Framerate = framerate, CompletionPercentage = percentComplete, Width = state.OutputWidth, Height = state.OutputHeight, AudioChannels = state.OutputAudioChannels, IsAudioDirect = EncodingHelper.IsCopyCodec(state.OutputAudioCodec), IsVideoDirect = EncodingHelper.IsCopyCodec(state.OutputVideoCodec), TranscodeReasons = state.TranscodeReasons }); } } /// /// Starts FFmpeg. /// /// The state. /// The output path. /// The command line arguments for FFmpeg. /// The . /// The . /// The cancellation token source. /// The working directory. /// Task. public async Task StartFfMpeg( StreamState state, string outputPath, string commandLineArguments, HttpRequest request, TranscodingJobType transcodingJobType, CancellationTokenSource cancellationTokenSource, string? workingDirectory = null) { var directory = Path.GetDirectoryName(outputPath) ?? throw new ArgumentException($"Provided path ({outputPath}) is not valid.", nameof(outputPath)); Directory.CreateDirectory(directory); await AcquireResources(state, cancellationTokenSource).ConfigureAwait(false); if (state.VideoRequest != null && !EncodingHelper.IsCopyCodec(state.OutputVideoCodec)) { var auth = _authorizationContext.GetAuthorizationInfo(request); if (auth.User != null && !auth.User.HasPermission(PermissionKind.EnableVideoPlaybackTranscoding)) { this.OnTranscodeFailedToStart(outputPath, transcodingJobType, state); throw new ArgumentException("User does not have access to video transcoding."); } } if (string.IsNullOrEmpty(_mediaEncoder.EncoderPath)) { throw new ArgumentException("FFmpeg path not set."); } var process = new Process { StartInfo = new ProcessStartInfo { WindowStyle = ProcessWindowStyle.Hidden, CreateNoWindow = true, UseShellExecute = false, // Must consume both stdout and stderr or deadlocks may occur // RedirectStandardOutput = true, RedirectStandardError = true, RedirectStandardInput = true, FileName = _mediaEncoder.EncoderPath, Arguments = commandLineArguments, WorkingDirectory = string.IsNullOrWhiteSpace(workingDirectory) ? string.Empty : workingDirectory, ErrorDialog = false }, EnableRaisingEvents = true }; var transcodingJob = this.OnTranscodeBeginning( outputPath, state.Request.PlaySessionId, state.MediaSource.LiveStreamId, Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture), transcodingJobType, process, state.Request.DeviceId, state, cancellationTokenSource); var commandLineLogMessage = process.StartInfo.FileName + " " + process.StartInfo.Arguments; _logger.LogInformation(commandLineLogMessage); var logFilePrefix = "FFmpeg.Transcode-"; if (state.VideoRequest != null && EncodingHelper.IsCopyCodec(state.OutputVideoCodec)) { logFilePrefix = EncodingHelper.IsCopyCodec(state.OutputAudioCodec) ? "FFmpeg.Remux-" : "FFmpeg.DirectStream-"; } var logFilePath = Path.Combine( _serverConfigurationManager.ApplicationPaths.LogDirectoryPath, $"{logFilePrefix}{DateTime.Now:yyyy-MM-dd_HH-mm-ss}_{state.Request.MediaSourceId}_{Guid.NewGuid().ToString()[..8]}.log"); // FFmpeg writes debug/error info to stderr. This is useful when debugging so let's put it in the log directory. Stream logStream = new FileStream(logFilePath, FileMode.Create, FileAccess.Write, FileShare.Read, IODefaults.FileStreamBufferSize, true); var commandLineLogMessageBytes = Encoding.UTF8.GetBytes(request.Path + Environment.NewLine + Environment.NewLine + JsonSerializer.Serialize(state.MediaSource) + Environment.NewLine + Environment.NewLine + commandLineLogMessage + Environment.NewLine + Environment.NewLine); await logStream.WriteAsync(commandLineLogMessageBytes, 0, commandLineLogMessageBytes.Length, cancellationTokenSource.Token).ConfigureAwait(false); process.Exited += (sender, args) => OnFfMpegProcessExited(process, transcodingJob, state); try { process.Start(); } catch (Exception ex) { _logger.LogError(ex, "Error starting FFmpeg"); this.OnTranscodeFailedToStart(outputPath, transcodingJobType, state); throw; } _logger.LogDebug("Launched FFmpeg process"); state.TranscodingJob = transcodingJob; // Important - don't await the log task or we won't be able to kill FFmpeg when the user stops playback _ = new JobLogger(_logger).StartStreamingLog(state, process.StandardError.BaseStream, logStream); // Wait for the file to exist before proceeeding var ffmpegTargetFile = state.WaitForPath ?? outputPath; _logger.LogDebug("Waiting for the creation of {0}", ffmpegTargetFile); while (!File.Exists(ffmpegTargetFile) && !transcodingJob.HasExited) { await Task.Delay(100, cancellationTokenSource.Token).ConfigureAwait(false); } _logger.LogDebug("File {0} created or transcoding has finished", ffmpegTargetFile); if (state.IsInputVideo && transcodingJob.Type == TranscodingJobType.Progressive && !transcodingJob.HasExited) { await Task.Delay(1000, cancellationTokenSource.Token).ConfigureAwait(false); if (state.ReadInputAtNativeFramerate && !transcodingJob.HasExited) { await Task.Delay(1500, cancellationTokenSource.Token).ConfigureAwait(false); } } if (!transcodingJob.HasExited) { StartThrottler(state, transcodingJob); } _logger.LogDebug("StartFfMpeg() finished successfully"); return transcodingJob; } private void StartThrottler(StreamState state, TranscodingJobDto transcodingJob) { if (EnableThrottling(state)) { transcodingJob.TranscodingThrottler = state.TranscodingThrottler = new TranscodingThrottler(transcodingJob, new Logger(new LoggerFactory()), _serverConfigurationManager, _fileSystem); state.TranscodingThrottler.Start(); } } private bool EnableThrottling(StreamState state) { var encodingOptions = _serverConfigurationManager.GetEncodingOptions(); // enable throttling when NOT using hardware acceleration if (string.IsNullOrEmpty(encodingOptions.HardwareAccelerationType)) { return state.InputProtocol == MediaProtocol.File && state.RunTimeTicks.HasValue && state.RunTimeTicks.Value >= TimeSpan.FromMinutes(5).Ticks && state.IsInputVideo && state.VideoType == VideoType.VideoFile && !EncodingHelper.IsCopyCodec(state.OutputVideoCodec); } return false; } /// /// Called when [transcode beginning]. /// /// The path. /// The play session identifier. /// The live stream identifier. /// The transcoding job identifier. /// The type. /// The process. /// The device id. /// The state. /// The cancellation token source. /// TranscodingJob. public TranscodingJobDto OnTranscodeBeginning( string path, string? playSessionId, string? liveStreamId, string transcodingJobId, TranscodingJobType type, Process process, string? deviceId, StreamState state, CancellationTokenSource cancellationTokenSource) { lock (_activeTranscodingJobs) { var job = new TranscodingJobDto(_loggerFactory.CreateLogger()) { Type = type, Path = path, Process = process, ActiveRequestCount = 1, DeviceId = deviceId, CancellationTokenSource = cancellationTokenSource, Id = transcodingJobId, PlaySessionId = playSessionId, LiveStreamId = liveStreamId, MediaSource = state.MediaSource }; _activeTranscodingJobs.Add(job); ReportTranscodingProgress(job, state, null, null, null, null, null); return job; } } /// /// Called when [transcode end]. /// /// The transcode job. public void OnTranscodeEndRequest(TranscodingJobDto job) { job.ActiveRequestCount--; _logger.LogDebug("OnTranscodeEndRequest job.ActiveRequestCount={ActiveRequestCount}", job.ActiveRequestCount); if (job.ActiveRequestCount <= 0) { PingTimer(job, false); } } /// /// /// The progressive /// /// Called when [transcode failed to start]. /// /// The path. /// The type. /// The state. public void OnTranscodeFailedToStart(string path, TranscodingJobType type, StreamState state) { lock (_activeTranscodingJobs) { var job = _activeTranscodingJobs.FirstOrDefault(j => j.Type == type && string.Equals(j.Path, path, StringComparison.OrdinalIgnoreCase)); if (job != null) { _activeTranscodingJobs.Remove(job); } } lock (_transcodingLocks) { _transcodingLocks.Remove(path); } if (!string.IsNullOrWhiteSpace(state.Request.DeviceId)) { _sessionManager.ClearTranscodingInfo(state.Request.DeviceId); } } /// /// Processes the exited. /// /// The process. /// The job. /// The state. private void OnFfMpegProcessExited(Process process, TranscodingJobDto job, StreamState state) { job.HasExited = true; _logger.LogDebug("Disposing stream resources"); state.Dispose(); if (process.ExitCode == 0) { _logger.LogInformation("FFmpeg exited with code 0"); } else { _logger.LogError("FFmpeg exited with code {0}", process.ExitCode); } process.Dispose(); } private async Task AcquireResources(StreamState state, CancellationTokenSource cancellationTokenSource) { if (state.VideoType == VideoType.Iso && state.IsoType.HasValue && _isoManager.CanMount(state.MediaPath)) { state.IsoMount = await _isoManager.Mount(state.MediaPath, cancellationTokenSource.Token).ConfigureAwait(false); } if (state.MediaSource.RequiresOpening && string.IsNullOrWhiteSpace(state.Request.LiveStreamId)) { var liveStreamResponse = await _mediaSourceManager.OpenLiveStream( new LiveStreamRequest { OpenToken = state.MediaSource.OpenToken }, cancellationTokenSource.Token) .ConfigureAwait(false); _encodingHelper.AttachMediaSourceInfo(state, liveStreamResponse.MediaSource, state.RequestedUrl); if (state.VideoRequest != null) { _encodingHelper.TryStreamCopy(state); } } if (state.MediaSource.BufferMs.HasValue) { await Task.Delay(state.MediaSource.BufferMs.Value, cancellationTokenSource.Token).ConfigureAwait(false); } } /// /// Called when [transcode begin request]. /// /// The path. /// The type. /// The . public TranscodingJobDto? OnTranscodeBeginRequest(string path, TranscodingJobType type) { lock (_activeTranscodingJobs) { var job = _activeTranscodingJobs.FirstOrDefault(j => j.Type == type && string.Equals(j.Path, path, StringComparison.OrdinalIgnoreCase)); if (job == null) { return null; } OnTranscodeBeginRequest(job); return job; } } private void OnTranscodeBeginRequest(TranscodingJobDto job) { job.ActiveRequestCount++; if (string.IsNullOrWhiteSpace(job.PlaySessionId) || job.Type == TranscodingJobType.Progressive) { job.StopKillTimer(); } } /// /// Gets the transcoding lock. /// /// The output path of the transcoded file. /// A . public SemaphoreSlim GetTranscodingLock(string outputPath) { lock (_transcodingLocks) { if (!_transcodingLocks.TryGetValue(outputPath, out SemaphoreSlim? result)) { result = new SemaphoreSlim(1, 1); _transcodingLocks[outputPath] = result; } return result; } } private void OnPlaybackProgress(object? sender, PlaybackProgressEventArgs e) { if (!string.IsNullOrWhiteSpace(e.PlaySessionId)) { PingTranscodingJob(e.PlaySessionId, e.IsPaused); } } /// /// Deletes the encoded media cache. /// private void DeleteEncodedMediaCache() { var path = _serverConfigurationManager.GetTranscodePath(); if (!Directory.Exists(path)) { return; } foreach (var file in _fileSystem.GetFilePaths(path, true)) { _fileSystem.DeleteFile(file); } } /// /// Dispose transcoding job helper. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Dispose throttler. /// /// Disposing. protected virtual void Dispose(bool disposing) { if (disposing) { _loggerFactory.Dispose(); _sessionManager!.PlaybackProgress -= OnPlaybackProgress; _sessionManager!.PlaybackStart -= OnPlaybackProgress; } } } }