Class: Procrastinator::Scheduler
- Inherits:
-
Object
- Object
- Procrastinator::Scheduler
- Defined in:
- lib/procrastinator/scheduler.rb
Overview
A Scheduler object provides the API for client applications to manage delayed tasks.
Use #delay to schedule new tasks, #reschedule to alter existing tasks, and #cancel to remove unwanted tasks.
Defined Under Namespace
Modules: DaemonWorking, SerialWorking, ThreadedWorking Classes: UpdateProxy, WorkProxy
Instance Method Summary collapse
-
#cancel(queue, identifier) ⇒ Object
Removes an existing task, as located by the given identifying information.
-
#defer(queue_name = nil, data: nil, run_at: Time.now, expire_at: nil) ⇒ Object
(also: #delay)
Records a new task to be executed at the given time.
-
#initialize(config) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#reschedule(queue, identifier) ⇒ Object
Alters an existing task to run at a new time, expire at a new time, or both.
-
#work(*queue_names) ⇒ Object
Spawns a new worker thread for each queue defined in the config.
Constructor Details
#initialize(config) ⇒ Scheduler
Returns a new instance of Scheduler.
12 13 14 |
# File 'lib/procrastinator/scheduler.rb', line 12 def initialize(config) @config = config end |
Instance Method Details
#cancel(queue, identifier) ⇒ Object
Removes an existing task, as located by the given identifying information.
The identifier can include any data field stored in the task loader. Often this is the information in :data.
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/procrastinator/scheduler.rb', line 60 def cancel(queue, identifier) queue = @config.queue(name: queue) tasks = queue.read(identifier.merge(queue: queue.name.to_s)) raise "no task matches search: #{ identifier }" if tasks.empty? raise "multiple tasks match search: #{ identifier }" if tasks.size > 1 queue.delete(tasks.first[:id]) end |
#defer(queue_name = nil, data: nil, run_at: Time.now, expire_at: nil) ⇒ Object Also known as: delay
Records a new task to be executed at the given time.
22 23 24 25 26 27 28 29 30 |
# File 'lib/procrastinator/scheduler.rb', line 22 def defer(queue_name = nil, data: nil, run_at: Time.now, expire_at: nil) raise ArgumentError, " must provide a queue name as the first argument. Received: \#{ queue_name }\n ERR\n\n queue = @config.queue(name: queue_name)\n\n queue.create(run_at: run_at, expire_at: expire_at, data: data)\nend\n" unless queue_name.nil? || queue_name.is_a?(Symbol) |
#reschedule(queue, identifier) ⇒ Object
Alters an existing task to run at a new time, expire at a new time, or both.
Call #to on the result and pass in the new :run_at and/or :expire_at.
Example:
scheduler.reschedule(:alerts, data: {user_id: 5}).to(run_at: Time.now, expire_at: Time.now + 10)
The identifier can include any data field stored in the task loader. Often this is the information in :data.
48 49 50 |
# File 'lib/procrastinator/scheduler.rb', line 48 def reschedule(queue, identifier) UpdateProxy.new(@config, identifier: identifier.merge(queue: queue.to_s)) end |
#work(*queue_names) ⇒ Object
Spawns a new worker thread for each queue defined in the config
75 76 77 78 79 80 81 82 83 |
# File 'lib/procrastinator/scheduler.rb', line 75 def work(*queue_names) queue_names = @config.queues if queue_names.empty? workers = queue_names.collect do |queue_name| QueueWorker.new(queue: queue_name, config: @config) end WorkProxy.new(workers, @config) end |