Class: Sidekiq::Job::Setter

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::JobUtil
Defined in:
lib/sidekiq/job.rb

Overview

This helper class encapsulates the set options for ‘set`, e.g.

SomeJob.set(queue: 'foo').perform_async(....)

Constant Summary

Constants included from Sidekiq::JobUtil

Sidekiq::JobUtil::TRANSIENT_ATTRIBUTES

Instance Method Summary collapse

Methods included from Sidekiq::JobUtil

#normalize_item, #normalized_hash, #validate, #verify_json

Constructor Details

#initialize(klass, opts) ⇒ Setter

Returns a new instance of Setter.



187
188
189
190
191
192
193
194
195
# File 'lib/sidekiq/job.rb', line 187

def initialize(klass, opts)
  @klass = klass
  # NB: the internal hash always has stringified keys
  @opts = opts.transform_keys(&:to_s)

  # ActiveJob compatibility
  interval = @opts.delete("wait_until") || @opts.delete("wait")
  at(interval) if interval
end

Instance Method Details

#perform_async(*args) ⇒ Object



205
206
207
208
209
210
211
# File 'lib/sidekiq/job.rb', line 205

def perform_async(*args)
  if @opts["sync"] == true
    perform_inline(*args)
  else
    @klass.client_push(@opts.merge("args" => args, "class" => @klass))
  end
end

#perform_bulk(args, batch_size: 1_000) ⇒ Object



251
252
253
254
# File 'lib/sidekiq/job.rb', line 251

def perform_bulk(args, batch_size: 1_000)
  client = @klass.build_client
  client.push_bulk(@opts.merge("class" => @klass, "args" => args, :batch_size => batch_size))
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


258
259
260
# File 'lib/sidekiq/job.rb', line 258

def perform_in(interval, *args)
  at(interval).perform_async(*args)
end

#perform_inline(*args) ⇒ Object Also known as: perform_sync

Explicit inline execution of a job. Returns nil if the job did not execute, true otherwise.



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/sidekiq/job.rb', line 215

def perform_inline(*args)
  raw = @opts.merge("args" => args, "class" => @klass)

  # validate and normalize payload
  item = normalize_item(raw)
  queue = item["queue"]

  # run client-side middleware
  cfg = Sidekiq.default_configuration
  result = cfg.client_middleware.invoke(item["class"], item, queue, cfg.redis_pool) do
    item
  end
  return nil unless result

  # round-trip the payload via JSON
  msg = Sidekiq.load_json(Sidekiq.dump_json(item))

  # prepare the job instance
  klass = Object.const_get(msg["class"])
  job = klass.new
  job.jid = msg["jid"]
  job.bid = msg["bid"] if job.respond_to?(:bid)

  # run the job through server-side middleware
  result = cfg.server_middleware.invoke(job, msg, msg["queue"]) do
    # perform it
    job.perform(*msg["args"])
    true
  end
  return nil unless result
  # jobs do not return a result. they should store any
  # modified state.
  true
end

#set(options) ⇒ Object



197
198
199
200
201
202
203
# File 'lib/sidekiq/job.rb', line 197

def set(options)
  hash = options.transform_keys(&:to_s)
  interval = hash.delete("wait_until") || @opts.delete("wait")
  @opts.merge!(hash)
  at(interval) if interval
  self
end