using MediaBrowser.Controller.Entities; using MediaBrowser.Controller.Entities.Audio; using MediaBrowser.Controller.Library; using MediaBrowser.Controller.Sync; using MediaBrowser.Model.Dlna; using MediaBrowser.Model.Dto; using MediaBrowser.Model.Logging; using MediaBrowser.Model.MediaInfo; using MediaBrowser.Model.Session; using MediaBrowser.Model.Sync; using MoreLinq; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace MediaBrowser.Server.Implementations.Sync { public class SyncJobProcessor { private readonly ILibraryManager _libraryManager; private readonly ISyncRepository _syncRepo; private readonly ISyncManager _syncManager; private readonly ILogger _logger; private readonly IUserManager _userManager; public SyncJobProcessor(ILibraryManager libraryManager, ISyncRepository syncRepo, ISyncManager syncManager, ILogger logger, IUserManager userManager) { _libraryManager = libraryManager; _syncRepo = syncRepo; _syncManager = syncManager; _logger = logger; _userManager = userManager; } public void ProcessJobItem(SyncJob job, SyncJobItem jobItem, SyncTarget target) { } public async Task EnsureJobItems(SyncJob job) { var user = _userManager.GetUserById(job.UserId); if (user == null) { throw new InvalidOperationException("Cannot proceed with sync because user no longer exists."); } var items = GetItemsForSync(job.RequestedItemIds, user, job.UnwatchedOnly) .ToList(); var jobItems = _syncRepo.GetJobItems(new SyncJobItemQuery { JobId = job.Id }).Items.ToList(); foreach (var item in items) { // Respect ItemLimit, if set if (job.ItemLimit.HasValue) { if (jobItems.Count >= job.ItemLimit.Value) { break; } } var itemId = item.Id.ToString("N"); var jobItem = jobItems.FirstOrDefault(i => string.Equals(i.ItemId, itemId, StringComparison.OrdinalIgnoreCase)); if (jobItem != null) { continue; } jobItem = new SyncJobItem { Id = Guid.NewGuid().ToString("N"), ItemId = itemId, JobId = job.Id, TargetId = job.TargetId, DateCreated = DateTime.UtcNow }; await _syncRepo.Create(jobItem).ConfigureAwait(false); jobItems.Add(jobItem); } jobItems = jobItems .OrderBy(i => i.DateCreated) .ToList(); await UpdateJobStatus(job, jobItems).ConfigureAwait(false); } public Task UpdateJobStatus(string id) { var job = _syncRepo.GetJob(id); return UpdateJobStatus(job); } private Task UpdateJobStatus(SyncJob job) { if (job == null) { throw new ArgumentNullException("job"); } var result = _syncRepo.GetJobItems(new SyncJobItemQuery { JobId = job.Id }); return UpdateJobStatus(job, result.Items.ToList()); } private Task UpdateJobStatus(SyncJob job, List jobItems) { job.ItemCount = jobItems.Count; double pct = 0; foreach (var item in jobItems) { if (item.Status == SyncJobItemStatus.Failed || item.Status == SyncJobItemStatus.Completed) { pct += 100; } else { pct += item.Progress ?? 0; } } if (job.ItemCount > 0) { pct /= job.ItemCount; job.Progress = pct; } else { job.Progress = null; } if (pct >= 100) { if (jobItems.Any(i => i.Status == SyncJobItemStatus.Failed)) { job.Status = SyncJobStatus.CompletedWithError; } else { job.Status = SyncJobStatus.Completed; } } else if (pct.Equals(0)) { job.Status = SyncJobStatus.Queued; } else { job.Status = SyncJobStatus.InProgress; } return _syncRepo.Update(job); } public IEnumerable GetItemsForSync(IEnumerable itemIds, User user, bool unwatchedOnly) { var items = itemIds .SelectMany(i => GetItemsForSync(i, user)) .Where(_syncManager.SupportsSync); if (unwatchedOnly) { // Avoid implicitly captured closure var currentUser = user; items = items.Where(i => { var video = i as Video; if (video != null) { return !video.IsPlayed(currentUser); } return true; }); } return items.DistinctBy(i => i.Id); } private IEnumerable GetItemsForSync(string id, User user) { var item = _libraryManager.GetItemById(id); if (item == null) { return new List(); } return GetItemsForSync(item, user); } private IEnumerable GetItemsForSync(BaseItem item, User user) { var itemByName = item as IItemByName; if (itemByName != null) { var items = user.RootFolder .GetRecursiveChildren(user); return itemByName.GetTaggedItems(items); } if (item.IsFolder) { var folder = (Folder)item; var items = folder.GetRecursiveChildren(user); items = items.Where(i => !i.IsFolder); if (!folder.IsPreSorted) { items = items.OrderBy(i => i.SortName); } return items; } return new[] { item }; } public async Task EnsureSyncJobs(CancellationToken cancellationToken) { var jobResult = _syncRepo.GetJobs(new SyncJobQuery { IsCompleted = false }); foreach (var job in jobResult.Items) { cancellationToken.ThrowIfCancellationRequested(); if (job.SyncNewContent) { await EnsureJobItems(job).ConfigureAwait(false); } } } public async Task Sync(IProgress progress, CancellationToken cancellationToken) { await EnsureSyncJobs(cancellationToken).ConfigureAwait(false); var result = _syncRepo.GetJobItems(new SyncJobItemQuery { IsCompleted = false }); var jobItems = result.Items; var index = 0; foreach (var item in jobItems) { double percent = index; percent /= result.TotalRecordCount; progress.Report(100 * percent); cancellationToken.ThrowIfCancellationRequested(); if (item.Status == SyncJobItemStatus.Queued) { await ProcessJobItem(item, cancellationToken).ConfigureAwait(false); } var job = _syncRepo.GetJob(item.JobId); await UpdateJobStatus(job).ConfigureAwait(false); index++; } } private async Task ProcessJobItem(SyncJobItem jobItem, CancellationToken cancellationToken) { var item = _libraryManager.GetItemById(jobItem.ItemId); if (item == null) { jobItem.Status = SyncJobItemStatus.Failed; _logger.Error("Unable to locate library item for JobItem {0}, ItemId {1}", jobItem.Id, jobItem.ItemId); await _syncRepo.Update(jobItem).ConfigureAwait(false); return; } var deviceProfile = _syncManager.GetDeviceProfile(jobItem.TargetId); if (deviceProfile == null) { jobItem.Status = SyncJobItemStatus.Failed; _logger.Error("Unable to locate SyncTarget for JobItem {0}, SyncTargetId {1}", jobItem.Id, jobItem.TargetId); await _syncRepo.Update(jobItem).ConfigureAwait(false); return; } jobItem.Progress = 0; jobItem.Status = SyncJobItemStatus.Converting; var video = item as Video; if (video != null) { await Sync(jobItem, video, deviceProfile, cancellationToken).ConfigureAwait(false); } else if (item is Audio) { await Sync(jobItem, (Audio)item, deviceProfile, cancellationToken).ConfigureAwait(false); } else if (item is Photo) { await Sync(jobItem, (Photo)item, deviceProfile, cancellationToken).ConfigureAwait(false); } else { await SyncGeneric(jobItem, item, deviceProfile, cancellationToken).ConfigureAwait(false); } } private async Task Sync(SyncJobItem jobItem, Video item, DeviceProfile profile, CancellationToken cancellationToken) { var options = new VideoOptions { Context = EncodingContext.Streaming, ItemId = item.Id.ToString("N"), DeviceId = jobItem.TargetId, Profile = profile, MediaSources = item.GetMediaSources(false).ToList() }; var streamInfo = new StreamBuilder().BuildVideoItem(options); var mediaSource = streamInfo.MediaSource; jobItem.MediaSourceId = streamInfo.MediaSourceId; if (streamInfo.PlayMethod == PlayMethod.Transcode) { await _syncRepo.Update(jobItem).ConfigureAwait(false); } else { if (mediaSource.Protocol == MediaProtocol.File) { jobItem.OutputPath = mediaSource.Path; } if (mediaSource.Protocol == MediaProtocol.Http) { jobItem.OutputPath = await DownloadFile(jobItem, mediaSource, cancellationToken).ConfigureAwait(false); } throw new InvalidOperationException(string.Format("Cannot direct stream {0} protocol", mediaSource.Protocol)); } // TODO: Transcode jobItem.OutputPath = mediaSource.Path; jobItem.Progress = 50; jobItem.Status = SyncJobItemStatus.Transferring; await _syncRepo.Update(jobItem).ConfigureAwait(false); } private async Task Sync(SyncJobItem jobItem, Audio item, DeviceProfile profile, CancellationToken cancellationToken) { var options = new AudioOptions { Context = EncodingContext.Streaming, ItemId = item.Id.ToString("N"), DeviceId = jobItem.TargetId, Profile = profile, MediaSources = item.GetMediaSources(false).ToList() }; var streamInfo = new StreamBuilder().BuildAudioItem(options); var mediaSource = streamInfo.MediaSource; jobItem.MediaSourceId = streamInfo.MediaSourceId; if (streamInfo.PlayMethod == PlayMethod.Transcode) { await _syncRepo.Update(jobItem).ConfigureAwait(false); } else { if (mediaSource.Protocol == MediaProtocol.File) { jobItem.OutputPath = mediaSource.Path; } if (mediaSource.Protocol == MediaProtocol.Http) { jobItem.OutputPath = await DownloadFile(jobItem, mediaSource, cancellationToken).ConfigureAwait(false); } throw new InvalidOperationException(string.Format("Cannot direct stream {0} protocol", mediaSource.Protocol)); } // TODO: Transcode jobItem.OutputPath = mediaSource.Path; jobItem.Progress = 50; jobItem.Status = SyncJobItemStatus.Transferring; await _syncRepo.Update(jobItem).ConfigureAwait(false); } private async Task Sync(SyncJobItem jobItem, Photo item, DeviceProfile profile, CancellationToken cancellationToken) { jobItem.OutputPath = item.Path; jobItem.Progress = 50; jobItem.Status = SyncJobItemStatus.Transferring; await _syncRepo.Update(jobItem).ConfigureAwait(false); } private async Task SyncGeneric(SyncJobItem jobItem, BaseItem item, DeviceProfile profile, CancellationToken cancellationToken) { jobItem.OutputPath = item.Path; jobItem.Progress = 50; jobItem.Status = SyncJobItemStatus.Transferring; await _syncRepo.Update(jobItem).ConfigureAwait(false); } private async Task DownloadFile(SyncJobItem jobItem, MediaSourceInfo mediaSource, CancellationToken cancellationToken) { // TODO: Download return mediaSource.Path; } } }