Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Method Summary collapse
-
#after_fork ⇒ Object
Hook method that is called after a new worker is forked.
-
#before_fork ⇒ Object
Hook method that is called before a new worker is forked.
-
#enqueue(*args) ⇒ Object
Add a job to the queue.
-
#enqueue_unique(*args) ⇒ Object
Add a unique job to the queue.
- #reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
- #work_off(num = 100) ⇒ Object
Instance Method Details
#after_fork ⇒ Object
Hook method that is called after a new worker is forked
91 92 |
# File 'lib/delayed/backend/base.rb', line 91 def after_fork end |
#before_fork ⇒ Object
Hook method that is called before a new worker is forked
87 88 |
# File 'lib/delayed/backend/base.rb', line 87 def before_fork end |
#enqueue(*args) ⇒ Object
Add a job to the queue
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/delayed/backend/base.rb', line 10 def enqueue(*args) = { :priority => Delayed::Worker.default_priority }.merge!(args.) [:payload_object] ||= args.shift if args.size > 0 warn "[DEPRECATION] Passing multiple arguments to `#enqueue` is deprecated. Pass a hash with :priority and :run_at." [:priority] = args.first || [:priority] [:run_at] = args[1] end unless [:payload_object].respond_to?(:perform) raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end if Delayed::Worker.delay_jobs self.new().tap do |job| Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do job.hook(:enqueue) job.save end end else Delayed::Job.new(:payload_object => [:payload_object]).tap do |job| job.invoke_job end end end |
#enqueue_unique(*args) ⇒ Object
Add a unique job to the queue
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/delayed/backend/base.rb', line 42 def enqueue_unique(*args) = { :priority => Delayed::Worker.default_priority }.merge!(args.) [:payload_object] ||= args.shift if args.size > 0 warn "[DEPRECATION] Passing multiple arguments to `#enqueue` is deprecated. Pass a hash with :priority and :run_at." [:unique_id] = args.first [:priority] = args[1] || [:priority] [:run_at] = args[2] end unless [:payload_object].respond_to?(:perform) raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end uncached do if Job.where(:unique_id => [:unique_id], :is_locked => false).length == 0 if Delayed::Worker.delay_jobs self.new().tap do |job| Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do job.hook(:enqueue) job.save end end else Delayed::Job.new(:payload_object => [:payload_object]).tap do |job| job.invoke_job end end end end end |
#reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/delayed/backend/base.rb', line 78 def reserve(worker, max_run_time = Worker.max_run_time) # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. # this leads to a more even distribution of jobs across the worker processes find_available(worker.name, worker.read_ahead, max_run_time).detect do |job| job.lock_exclusively!(max_run_time, worker.name) end end |