Module: Delayed::Backend::Base::ClassMethods

Defined in:
lib/delayed/backend/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_enqueue_argsObject

Returns the value of attribute batch_enqueue_args.



26
27
28
# File 'lib/delayed/backend/base.rb', line 26

def batch_enqueue_args
  @batch_enqueue_args
end

#batchesObject

Returns the value of attribute batches.



26
27
28
# File 'lib/delayed/backend/base.rb', line 26

def batches
  @batches
end

#default_priorityObject

Returns the value of attribute default_priority.



26
27
28
# File 'lib/delayed/backend/base.rb', line 26

def default_priority
  @default_priority
end

Instance Method Details

#check_priorities(min_priority, max_priority) ⇒ Object



136
137
138
139
140
141
142
143
# File 'lib/delayed/backend/base.rb', line 136

def check_priorities(min_priority, max_priority)
  if min_priority && min_priority < Delayed::MIN_PRIORITY
    raise ArgumentError, "min_priority #{min_priority} can't be less than #{Delayed::MIN_PRIORITY}"
  end
  if max_priority && max_priority > Delayed::MAX_PRIORITY # rubocop:disable Style/GuardClause
    raise ArgumentError, "max_priority #{max_priority} can't be greater than #{Delayed::MAX_PRIORITY}"
  end
end

#check_queue(queue) ⇒ Object

Raises:

  • (ArgumentError)


132
133
134
# File 'lib/delayed/backend/base.rb', line 132

def check_queue(queue)
  raise(ArgumentError, "queue name can't be blank") if queue.blank?
end

#db_time_nowObject

Get the current time (UTC) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



148
149
150
# File 'lib/delayed/backend/base.rb', line 148

def db_time_now
  Time.now.utc
end

#enqueue(object, priority: default_priority, run_at: nil, expires_at: nil, queue: Delayed::Settings.queue, strand: nil, singleton: nil, n_strand: nil, max_attempts: Delayed::Settings.max_attempts, **kwargs) ⇒ Object

Add a job to the queue The first argument should be an object that respond_to?(:perform) The rest should be named arguments, these keys are expected: :priority, :run_at, :queue, :strand, :singleton Example: Delayed::Job.enqueue(object, priority: 0, run_at: time, queue: queue)

Raises:

  • (ArgumentError)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/delayed/backend/base.rb', line 33

def enqueue(object,
            priority: default_priority,
            run_at: nil,
            expires_at: nil,
            queue: Delayed::Settings.queue,
            strand: nil,
            singleton: nil,
            n_strand: nil,
            max_attempts: Delayed::Settings.max_attempts,
            **kwargs)

  unless object.respond_to?(:perform)
    raise ArgumentError, "Cannot enqueue items which do not respond to perform"
  end

  strand ||= singleton if Settings.infer_strand_from_singleton

  kwargs = Settings.default_job_options.merge(kwargs)
  kwargs[:payload_object] = object
  kwargs[:priority] = priority
  kwargs[:run_at] = run_at if run_at
  kwargs[:strand] = strand
  kwargs[:max_attempts] = max_attempts
  if defined?(Marginalia) && Marginalia::Comment.components
    kwargs[:source] =
      Marginalia::Comment.construct_comment
  end
  kwargs[:expires_at] = expires_at
  kwargs[:queue] = queue
  kwargs[:singleton] = singleton

  raise ArgumentError, "Only one of strand or n_strand can be used" if strand && n_strand
  if (strand || n_strand) && run_at && run_at > Job.db_time_now + Settings.stranded_run_at_grace_period
    raise ArgumentError, "Do not use run_at with strand; you may inadvertently clog the strand"
  end

  # If two parameters are given to n_strand, the first param is used
  # as the strand name for looking up the Setting, while the second
  # param is appended to make a unique set of strands.
  #
  # For instance, you can pass ["my_job_type", # root_account.global_id]
  # to get a set of n strands per root account, and you can apply the
  # same default to all.
  if n_strand
    strand_name, ext = n_strand

    if ext
      full_strand_name = "#{strand_name}/#{ext}"
      num_strands = Delayed::Settings.num_strands.call(full_strand_name)
    else
      full_strand_name = strand_name
    end

    num_strands ||= Delayed::Settings.num_strands.call(strand_name)
    num_strands = num_strands ? num_strands.to_i : 1

    kwargs.merge!(n_strand_options(full_strand_name, num_strands))
  end

  job = nil

  if singleton
    Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do
      job = create(**kwargs)
    end
  elsif batches && strand.nil? && run_at.nil?
    batch_enqueue_args = kwargs.slice(*self.batch_enqueue_args)
    batches[batch_enqueue_args] << kwargs
    return true
  else
    raise ArgumentError, "on_conflict can only be provided with singleton" if kwargs[:on_conflict]

    Delayed::Worker.lifecycle.run_callbacks(:create, kwargs) do
      job = create(**kwargs)
    end
  end

  JobTracking.job_created(job)

  job
end

#in_delayed_job=(val) ⇒ Object



128
129
130
# File 'lib/delayed/backend/base.rb', line 128

def in_delayed_job=(val)
  Thread.current[:in_delayed_job] = val
end

#in_delayed_job?Boolean

Returns:

  • (Boolean)


124
125
126
# File 'lib/delayed/backend/base.rb', line 124

def in_delayed_job?
  !!Thread.current[:in_delayed_job]
end

#n_strand_options(strand_name, num_strands) ⇒ Object

by default creates a new strand name randomly based on num_strands effectively balancing the load during queueing overwritten in ActiveRecord::Job to use triggers to balance at run time



118
119
120
121
122
# File 'lib/delayed/backend/base.rb', line 118

def n_strand_options(strand_name, num_strands)
  strand_num = (num_strands > 1) ? rand(num_strands) + 1 : 1
  strand_name += ":#{strand_num}" if strand_num > 1
  { strand: strand_name }
end

#processes_locked_locally(name: nil) ⇒ Object



152
153
154
155
156
157
158
# File 'lib/delayed/backend/base.rb', line 152

def processes_locked_locally(name: nil)
  name ||= Socket.gethostname rescue x
  local_jobs = running_jobs.select do |job|
    job.locked_by.start_with?("#{name}:")
  end
  local_jobs.map { |job| job.locked_by.split(":").last.to_i }
end

#unlock_orphaned_jobs(pid = nil, name = nil) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/delayed/backend/base.rb', line 170

def unlock_orphaned_jobs(pid = nil, name = nil)
  begin
    name ||= Socket.gethostname
  rescue
    return 0
  end
  pid_regex = pid || '(\d+)'
  regex = Regexp.new("^#{Regexp.escape(name)}:#{pid_regex}$")
  unlocked_jobs = 0
  escaped_name = name.gsub("\\", "\\\\")
                     .gsub("%", "\\%")
                     .gsub("_", "\\_")
  locked_by_like = "#{escaped_name}:%"
  running = false if pid
  jobs = running_jobs.limit(100)
  jobs = pid ? jobs.where(locked_by: "#{name}:#{pid}") : jobs.where("locked_by LIKE ?", locked_by_like)
  ignores = []
  loop do
    batch_scope = ignores.empty? ? jobs : jobs.where.not(id: ignores)
    # if we don't reload this it's possible to keep getting the
    # same array each loop even after the jobs have been deleted.
    batch = batch_scope.reload.to_a
    break if batch.empty?

    batch.each do |job|
      unless job.locked_by =~ regex
        ignores << job.id
        next
      end

      unless pid
        job_pid = $1.to_i
        running = Process.kill(0, job_pid) rescue false
      end

      if running
        ignores << job.id
      else
        unlocked_jobs += 1
        job.reschedule("process died")
      end
    end
  end
  unlocked_jobs
end

#unlock_orphaned_prefetched_jobsObject



160
161
162
163
164
165
166
167
168
# File 'lib/delayed/backend/base.rb', line 160

def unlock_orphaned_prefetched_jobs
  horizon = db_time_now - (Settings.parent_process[:prefetched_jobs_timeout] * 4)
  orphaned_jobs = running_jobs.select do |job|
    job.locked_by.start_with?("prefetch:") && job.locked_at < horizon
  end
  return 0 if orphaned_jobs.empty?

  unlock(orphaned_jobs)
end