Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Attribute Summary collapse
-
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
-
#batches ⇒ Object
Returns the value of attribute batches.
-
#default_priority ⇒ Object
Returns the value of attribute default_priority.
Instance Method Summary collapse
- #check_priorities(min_priority, max_priority) ⇒ Object
- #check_queue(queue) ⇒ Object
-
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue).
- #in_delayed_job=(val) ⇒ Object
- #in_delayed_job? ⇒ Boolean
-
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time.
- #processes_locked_locally(name: nil) ⇒ Object
- #unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
- #unlock_orphaned_prefetched_jobs ⇒ Object
Instance Attribute Details
#batch_enqueue_args ⇒ Object
Returns the value of attribute batch_enqueue_args.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def batch_enqueue_args @batch_enqueue_args end |
#batches ⇒ Object
Returns the value of attribute batches.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def batches @batches end |
#default_priority ⇒ Object
Returns the value of attribute default_priority.
26 27 28 |
# File 'lib/delayed/backend/base.rb', line 26 def default_priority @default_priority end |
Instance Method Details
#check_priorities(min_priority, max_priority) ⇒ Object
136 137 138 139 140 141 142 143 |
# File 'lib/delayed/backend/base.rb', line 136 def check_priorities(min_priority, max_priority) if min_priority && min_priority < Delayed::MIN_PRIORITY raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}" end if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}" end end |
#check_queue(queue) ⇒ Object
132 133 134 |
# File 'lib/delayed/backend/base.rb', line 132 def check_queue(queue) raise(ArgumentError, "queue name can't be blank") if queue.blank? end |
#db_time_now ⇒ Object
Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
148 149 150 |
# File 'lib/delayed/backend/base.rb', line 148 def db_time_now Time.now.utc end |
#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object
Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/delayed/backend/base.rb', line 33 def enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) unless object.respond_to?(:perform) raise ArgumentError, "Cannot enqueue items which do not respond to perform" end strand ||= singleton if Settings.infer_strand_from_singleton kwargs = Settings..merge(kwargs) kwargs[:payload_object] = object kwargs[:priority] = priority kwargs[:run_at] = run_at if run_at kwargs[:strand] = strand kwargs[:max_attempts] = max_attempts if defined?(Marginalia) && Marginalia::Comment.components kwargs[:source] = Marginalia::Comment.construct_comment end kwargs[:expires_at] = expires_at kwargs[:queue] = queue kwargs[:singleton] = singleton raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand if (strand || n_strand) && run_at && run_at > Job.db_time_now + Settings.stranded_run_at_grace_period raise ArgumentError, "Do not use run_at with strand; you may inadvertently clog the strand" end # If two parameters are given to n_strand, the first param is used # as the strand name for looking up the Setting, while the second # param is appended to make a unique set of strands. # # For instance, you can pass ["my_job_type", # root_account.global_id] # to get a set of n strands per root account, and you can apply the # same default to all. if n_strand strand_name, ext = n_strand if ext full_strand_name = "#{strand_name}/#{ext}" num_strands = Delayed::Settings.num_strands.call(full_strand_name) else full_strand_name = strand_name end num_strands ||= Delayed::Settings.num_strands.call(strand_name) num_strands = num_strands ? num_strands.to_i : 1 kwargs.merge!((full_strand_name, num_strands)) end job = nil if singleton Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do job = create(**kwargs) end elsif batches && strand.nil? && run_at.nil? batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args) batches[batch_enqueue_args] << kwargs return true else raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict] Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do job = create(**kwargs) end end JobTracking.job_created(job) job end |
#in_delayed_job=(val) ⇒ Object
128 129 130 |
# File 'lib/delayed/backend/base.rb', line 128 def in_delayed_job=(val) Thread.current[:in_delayed_job] = val end |
#in_delayed_job? ⇒ Boolean
124 125 126 |
# File 'lib/delayed/backend/base.rb', line 124 def in_delayed_job? !!Thread.current[:in_delayed_job] end |
#n_strand_options(strand_name, num_strands) ⇒ Object
by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time
118 119 120 121 122 |
# File 'lib/delayed/backend/base.rb', line 118 def (strand_name, num_strands) strand_num = (num_strands > 1) ? rand(num_strands) + 1 : 1 strand_name += ":#{strand_num}" if strand_num > 1 { strand: strand_name } end |
#processes_locked_locally(name: nil) ⇒ Object
152 153 154 155 156 157 158 |
# File 'lib/delayed/backend/base.rb', line 152 def processes_locked_locally(name: nil) name ||= Socket.gethostname rescue x local_jobs = running_jobs.select do |job| job.locked_by.start_with?("#{name}:") end local_jobs.map { |job| job.locked_by.split(":").last.to_i } end |
#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/delayed/backend/base.rb', line 170 def unlock_orphaned_jobs(pid = nil, name = nil) begin name ||= Socket.gethostname rescue return 0 end pid_regex = pid || '(\d+)' regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$") unlocked_jobs = 0 escaped_name = name.gsub("\\", "\\\\") .gsub("%", "\\%") .gsub("_", "\\_") locked_by_like = "#{escaped_name}:%" running = false if pid jobs = running_jobs.limit(100) jobs = pid ? jobs.where(locked_by: "#{name}:#{pid}") : jobs.where("locked_by LIKE ?", locked_by_like) ignores = [] loop do batch_scope = ignores.empty? ? jobs : jobs.where.not(id: ignores) # if we don't reload this it's possible to keep getting the # same array each loop even after the jobs have been deleted. batch = batch_scope.reload.to_a break if batch.empty? batch.each do |job| unless job.locked_by =~ regex ignores << job.id next end unless pid job_pid = $1.to_i running = Process.kill(0, job_pid) rescue false end if running ignores << job.id else unlocked_jobs += 1 job.reschedule("process died") end end end unlocked_jobs end |
#unlock_orphaned_prefetched_jobs ⇒ Object
160 161 162 163 164 165 166 167 168 |
# File 'lib/delayed/backend/base.rb', line 160 def unlock_orphaned_prefetched_jobs horizon = db_time_now - (Settings.parent_process[:prefetched_jobs_timeout] * 4) orphaned_jobs = running_jobs.select do |job| job.locked_by.start_with?("prefetch:") && job.locked_at < horizon end return 0 if orphaned_jobs.empty? unlock(orphaned_jobs) end |