Class: Faktory::Job::Setter
- Inherits:
-
Object
- Object
- Faktory::Job::Setter
- Defined in:
- lib/faktory/job.rb
Overview
This helper class encapsulates the set options for set, e.g.
SomeJob.set(queue: 'foo').perform_async(....)
Instance Method Summary collapse
-
#client_push(item) ⇒ Object
:nodoc:.
-
#initialize(opts) ⇒ Setter
constructor
A new instance of Setter.
- #perform_async(*args) ⇒ Object
-
#perform_in(interval, *args) ⇒ Object
(also: #perform_at)
intervalmust be a timestamp, numeric or something that acts numeric (like an activesupport time interval). - #set(opts) ⇒ Object
Constructor Details
#initialize(opts) ⇒ Setter
Returns a new instance of Setter.
70 71 72 |
# File 'lib/faktory/job.rb', line 70 def initialize(opts) @opts = opts end |
Instance Method Details
#client_push(item) ⇒ Object
:nodoc:
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/faktory/job.rb', line 100 def client_push(item) # :nodoc: # stringify item.keys.each do |key| item[key.to_s] = item.delete(key) end item["jid"] ||= SecureRandom.hex(12) item["queue"] ||= "default" pool = Thread.current[:faktory_via_pool] || item["pool"] || Faktory.server_pool item.delete("pool") # the payload hash is shallow copied by `merge` calls BUT we don't deep clone # the 'custom' child hash which can be problematic if we mutate it within middleware. # Proactively dup it first. item["custom"] = item["custom"].dup if item["custom"] Faktory.client_middleware.invoke(item, pool) do pool.with do |c| c.push(item) end end end |
#perform_async(*args) ⇒ Object
79 80 81 |
# File 'lib/faktory/job.rb', line 79 def perform_async(*args) client_push(@opts.merge("args" => args)) end |
#perform_in(interval, *args) ⇒ Object Also known as: perform_at
interval must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/faktory/job.rb', line 85 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = ((int < 1_000_000_000) ? now + int : int) at = Time.at(ts).utc.to_datetime.rfc3339(9) item = @opts.merge("args" => args, "at" => at) # Optimization to enqueue something now that is scheduled to go out now or in the past item.delete("at") if ts <= now client_push(item) end |
#set(opts) ⇒ Object
74 75 76 77 |
# File 'lib/faktory/job.rb', line 74 def set(opts) @opts = Util.deep_merge(@opts, opts) self end |