using MediaBrowser.Common.Configuration; using MediaBrowser.Common.Events; using MediaBrowser.Model.Events; using MediaBrowser.Model.Logging; using MediaBrowser.Model.Serialization; using MediaBrowser.Model.Tasks; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using MediaBrowser.Model.IO; using MediaBrowser.Model.System; namespace Emby.Common.Implementations.ScheduledTasks { /// /// Class TaskManager /// public class TaskManager : ITaskManager { public event EventHandler> TaskExecuting; public event EventHandler TaskCompleted; /// /// Gets the list of Scheduled Tasks /// /// The scheduled tasks. public IScheduledTaskWorker[] ScheduledTasks { get; private set; } /// /// The _task queue /// private readonly ConcurrentQueue> _taskQueue = new ConcurrentQueue>(); /// /// Gets or sets the json serializer. /// /// The json serializer. private IJsonSerializer JsonSerializer { get; set; } /// /// Gets or sets the application paths. /// /// The application paths. private IApplicationPaths ApplicationPaths { get; set; } private readonly ISystemEvents _systemEvents; /// /// Gets the logger. /// /// The logger. private ILogger Logger { get; set; } private readonly IFileSystem _fileSystem; private bool _suspendTriggers; public bool SuspendTriggers { get { return _suspendTriggers; } set { Logger.Info("Setting SuspendTriggers to {0}", value); var executeQueued = _suspendTriggers && !value; _suspendTriggers = value; if (executeQueued) { ExecuteQueuedTasks(); } } } /// /// Initializes a new instance of the class. /// /// The application paths. /// The json serializer. /// The logger. /// kernel public TaskManager(IApplicationPaths applicationPaths, IJsonSerializer jsonSerializer, ILogger logger, IFileSystem fileSystem, ISystemEvents systemEvents) { ApplicationPaths = applicationPaths; JsonSerializer = jsonSerializer; Logger = logger; _fileSystem = fileSystem; _systemEvents = systemEvents; ScheduledTasks = new IScheduledTaskWorker[] { }; } private void BindToSystemEvent() { _systemEvents.Resume += _systemEvents_Resume; } private void _systemEvents_Resume(object sender, EventArgs e) { foreach (var task in ScheduledTasks) { task.ReloadTriggerEvents(); } } /// /// Cancels if running and queue. /// /// /// Task options. public void CancelIfRunningAndQueue(TaskExecutionOptions options) where T : IScheduledTask { var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); ((ScheduledTaskWorker)task).CancelIfRunning(); QueueScheduledTask(options); } public void CancelIfRunningAndQueue() where T : IScheduledTask { CancelIfRunningAndQueue(new TaskExecutionOptions()); } /// /// Cancels if running /// /// public void CancelIfRunning() where T : IScheduledTask { var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); ((ScheduledTaskWorker)task).CancelIfRunning(); } /// /// Queues the scheduled task. /// /// /// Task options public void QueueScheduledTask(TaskExecutionOptions options) where T : IScheduledTask { var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T)); if (scheduledTask == null) { Logger.Error("Unable to find scheduled task of type {0} in QueueScheduledTask.", typeof(T).Name); } else { QueueScheduledTask(scheduledTask, options); } } public void QueueScheduledTask() where T : IScheduledTask { QueueScheduledTask(new TaskExecutionOptions()); } public void QueueIfNotRunning() where T : IScheduledTask { var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); if (task.State != TaskState.Running) { QueueScheduledTask(new TaskExecutionOptions()); } } public void Execute() where T : IScheduledTask { var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T)); if (scheduledTask == null) { Logger.Error("Unable to find scheduled task of type {0} in Execute.", typeof(T).Name); } else { var type = scheduledTask.ScheduledTask.GetType(); Logger.Info("Queueing task {0}", type.Name); lock (_taskQueue) { if (scheduledTask.State == TaskState.Idle) { Execute(scheduledTask, new TaskExecutionOptions()); } } } } /// /// Queues the scheduled task. /// /// The task. /// The task options. public void QueueScheduledTask(IScheduledTask task, TaskExecutionOptions options) { var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == task.GetType()); if (scheduledTask == null) { Logger.Error("Unable to find scheduled task of type {0} in QueueScheduledTask.", task.GetType().Name); } else { QueueScheduledTask(scheduledTask, options); } } /// /// Queues the scheduled task. /// /// The task. /// The task options. private void QueueScheduledTask(IScheduledTaskWorker task, TaskExecutionOptions options) { var type = task.ScheduledTask.GetType(); Logger.Info("Queueing task {0}", type.Name); lock (_taskQueue) { if (task.State == TaskState.Idle && !SuspendTriggers) { Execute(task, options); return; } _taskQueue.Enqueue(new Tuple(type, options)); } } /// /// Adds the tasks. /// /// The tasks. public void AddTasks(IEnumerable tasks) { var myTasks = ScheduledTasks.ToList(); var list = tasks.ToList(); myTasks.AddRange(list.Select(t => new ScheduledTaskWorker(t, ApplicationPaths, this, JsonSerializer, Logger, _fileSystem))); ScheduledTasks = myTasks.ToArray(); BindToSystemEvent(); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { foreach (var task in ScheduledTasks) { task.Dispose(); } } public void Cancel(IScheduledTaskWorker task) { ((ScheduledTaskWorker)task).Cancel(); } public Task Execute(IScheduledTaskWorker task, TaskExecutionOptions options) { return ((ScheduledTaskWorker)task).Execute(options); } /// /// Called when [task executing]. /// /// The task. internal void OnTaskExecuting(IScheduledTaskWorker task) { EventHelper.FireEventIfNotNull(TaskExecuting, this, new GenericEventArgs { Argument = task }, Logger); } /// /// Called when [task completed]. /// /// The task. /// The result. internal void OnTaskCompleted(IScheduledTaskWorker task, TaskResult result) { EventHelper.FireEventIfNotNull(TaskCompleted, task, new TaskCompletionEventArgs { Result = result, Task = task }, Logger); ExecuteQueuedTasks(); } /// /// Executes the queued tasks. /// private void ExecuteQueuedTasks() { if (SuspendTriggers) { return; } Logger.Info("ExecuteQueuedTasks"); // Execute queued tasks lock (_taskQueue) { var list = new List>(); Tuple item; while (_taskQueue.TryDequeue(out item)) { if (list.All(i => i.Item1 != item.Item1)) { list.Add(item); } } foreach (var enqueuedType in list) { var scheduledTask = ScheduledTasks.First(t => t.ScheduledTask.GetType() == enqueuedType.Item1); if (scheduledTask.State == TaskState.Idle) { Execute(scheduledTask, enqueuedType.Item2); } } } } } }