Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Attribute Summary collapse
-
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
-
#batches ⇒ Object
Returns the value of attribute batches.
-
#default_priority ⇒ Object
Returns the value of attribute default_priority.
Instance Method Summary collapse
- #check_priorities(min_priority, max_priority) ⇒ Object
- #check_queue(queue) ⇒ Object
-
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue).
- #in_delayed_job=(val) ⇒ Object
- #in_delayed_job? ⇒ Boolean
-
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time.
- #processes_locked_locally(name: nil) ⇒ Object
- #unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
- #unlock_orphaned_prefetched_jobs ⇒ Object
Instance Attribute Details
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def batch_enqueue_args @batch_enqueue_args end |
#batches ⇒ Object
Returns the value of attribute batches.
25 26 27 |
# File 'lib/delayed/backend/base.rb', line 25 def batches @batches end |
#default_priority ⇒ Object
Returns the value of attribute default_priority.
27 28 29 |
# File 'lib/delayed/backend/base.rb', line 27 def default_priority @default_priority end |
Instance Method Details
#check_priorities(min_priority, max_priority) ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/delayed/backend/base.rb', line 119 def check_priorities(min_priority, max_priority) if min_priority && min_priority < Delayed::MIN_PRIORITY raise(ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}") end if max_priority && max_priority > Delayed::MAX_PRIORITY raise(ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}") end end |
#check_queue(queue) ⇒ Object
115 116 117 |
# File 'lib/delayed/backend/base.rb', line 115 def check_queue(queue) raise(ArgumentError, "queue name can't be blank") if queue.blank? end |
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
131 132 133 |
# File 'lib/delayed/backend/base.rb', line 131 def db_time_now Time.now.utc end |
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/delayed/backend/base.rb', line 34 def enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) unless object.respond_to?(:perform) raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end kwargs = Settings..merge(kwargs) kwargs[:payload_object] = object kwargs[:priority] = priority kwargs[:run_at] = run_at if run_at kwargs[:strand] = strand kwargs[:max_attempts] = max_attempts kwargs[:source] = Marginalia::Comment.construct_comment if defined?(Marginalia) && Marginalia::Comment.components kwargs[:expires_at] = expires_at kwargs[:queue] = queue # If two parameters are given to n_strand, the first param is used # as the strand name for looking up the Setting, while the second # param is appended to make a unique set of strands. # # For instance, you can pass ["my_job_type", # root_account.global_id] # to get a set of n strands per root account, and you can apply the # same default to all. if n_strand strand_name, ext = n_strand if ext full_strand_name = "#{strand_name}/#{ext}" num_strands = Delayed::Settings.num_strands.call(full_strand_name) else full_strand_name = strand_name end num_strands ||= Delayed::Settings.num_strands.call(strand_name) num_strands = num_strands ? num_strands.to_i : 1 kwargs.merge!((full_strand_name, num_strands)) end if singleton kwargs[:strand] = singleton job = self.create_singleton(**kwargs) elsif batches && strand.nil? && run_at.nil? batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args) batches[batch_enqueue_args] << kwargs return true else job = self.create(**kwargs) end JobTracking.job_created(job) job end |
#in_delayed_job=(val) ⇒ Object
111 112 113 |
# File 'lib/delayed/backend/base.rb', line 111 def in_delayed_job=(val) Thread.current[:in_delayed_job] = val end |
#in_delayed_job? ⇒ Boolean
107 108 109 |
# File 'lib/delayed/backend/base.rb', line 107 def in_delayed_job? !!Thread.current[:in_delayed_job] end |
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time
101 102 103 104 105 |
# File 'lib/delayed/backend/base.rb', line 101 def (strand_name, num_strands) strand_num = num_strands > 1 ? rand(num_strands) + 1 : 1 strand_name += ":#{strand_num}" if strand_num > 1 { strand: strand_name } end |
#processes_locked_locally(name: nil) ⇒ Object
135 136 137 138 |
# File 'lib/delayed/backend/base.rb', line 135 def processes_locked_locally(name: nil) name ||= Socket.gethostname rescue x running_jobs.select{|job| job.locked_by.start_with?("#{name}:")}.map{|job| job.locked_by.split(':').last.to_i} end |
#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/delayed/backend/base.rb', line 147 def unlock_orphaned_jobs(pid = nil, name = nil) begin name ||= Socket.gethostname rescue return 0 end pid_regex = pid || '(\d+)' regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$") unlocked_jobs = 0 running = false if pid self.running_jobs.each do |job| next unless job.locked_by =~ regex unless pid job_pid = $1.to_i running = Process.kill(0, job_pid) rescue false end if !running unlocked_jobs += 1 job.reschedule("process died") end end unlocked_jobs end |
#unlock_orphaned_prefetched_jobs ⇒ Object
140 141 142 143 144 145 |
# File 'lib/delayed/backend/base.rb', line 140 def unlock_orphaned_prefetched_jobs horizon = db_time_now - Settings.parent_process[:prefetched_jobs_timeout] * 4 orphaned_jobs = running_jobs.select { |job| job.locked_by.start_with?('prefetch:') && job.locked_at < horizon } return 0 if orphaned_jobs.empty? unlock(orphaned_jobs) end |