Module: SideJob::JobMethods
- Included in:
- Job
- Defined in:
- lib/sidejob/job.rb
Overview
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
True if two jobs or workers have the same id.
-
#add_alias(name) ⇒ Object
Add an alias for the job.
-
#adopt(orphan, name) ⇒ Object
Adopt a parent-less job as a child of this job.
-
#aliases ⇒ Array<String>
Returns all aliases for the job.
-
#child(name) ⇒ SideJob::Job?
Returns a child job by name.
-
#children ⇒ Hash<String => SideJob::Job>
Returns all children jobs.
-
#delete ⇒ Boolean
Deletes the job and all children jobs (recursively) if all are terminated.
-
#disown(name_or_job) ⇒ Object
Disown a child job so that it no longer has a parent.
- #eql?(other) ⇒ Boolean
-
#exists? ⇒ Boolean
Returns if the job still exists.
-
#get(key) ⇒ Object?
Returns some data from the job’s internal state.
-
#hash ⇒ Fixnum
Hash value based on the id.
-
#info ⇒ Hash
Returns.
-
#inports ⇒ Array<SideJob::Port>
Gets all input ports.
-
#inports=(ports) ⇒ Object
Sets the input ports for the job.
-
#input(name) ⇒ SideJob::Port
Returns an input port.
-
#lock(ttl, retries: 3, retry_delay: 0.2) ⇒ String?
Acquire a lock on the job with a given expiration time.
-
#outports ⇒ Array<SideJob::Port>
Gets all output ports.
-
#outports=(ports) ⇒ Object
Sets the input ports for the job.
-
#output(name) ⇒ SideJob::Port
Returns an output port.
-
#parent ⇒ SideJob::Job?
Returns the parent job.
-
#publish(message) ⇒ Object
Publishes a message to the job’s channel.
-
#queue(queue, klass, **options) ⇒ Object
Queues a child job, setting parent and by to self.
-
#redis_key ⇒ String
(also: #to_s)
Prefix for all redis keys related to this job.
-
#refresh_lock(ttl) ⇒ Boolean
Refresh the lock expiration.
-
#remove_alias(name) ⇒ Object
Remove an alias for the job.
-
#run(parent: false, force: false, at: nil, wait: nil) ⇒ SideJob::Job?
Run the job.
-
#set(data) ⇒ Object
Sets values in the job’s internal state.
-
#state ⇒ Hash{String => Object}
Returns the entirety of the job’s internal state.
-
#status ⇒ String
Retrieve the job’s status.
-
#status=(status) ⇒ Object
Set the job status.
-
#terminate(recursive: false) ⇒ SideJob::Job
Prepare to terminate the job.
-
#terminated? ⇒ Boolean
Returns if job and all children are terminated.
-
#unlock(token) ⇒ Boolean
Unlock job by deleting the lock only if it equals the lock token.
-
#unset(*fields) ⇒ Object
Unsets some fields in the job’s internal state.
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
4 5 6 |
# File 'lib/sidejob/job.rb', line 4 def id @id end |
Instance Method Details
#==(other) ⇒ Boolean
Returns True if two jobs or workers have the same id.
7 8 9 |
# File 'lib/sidejob/job.rb', line 7 def ==(other) other.respond_to?(:id) && id == other.id end |
#add_alias(name) ⇒ Object
Add an alias for the job.
61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/sidejob/job.rb', line 61 def add_alias(name) check_exists raise "#{name} is not a valid alias" unless name =~ /^[[:alpha:]]/ current = SideJob.redis.hget('jobs:aliases', name) if current raise "#{name} is already used by job #{current}" if current.to_i != id else SideJob.redis.multi do |multi| multi.hset 'jobs:aliases', name, id multi.sadd "#{redis_key}:aliases", name end end end |
#adopt(orphan, name) ⇒ Object
Adopt a parent-less job as a child of this job.
198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/sidejob/job.rb', line 198 def adopt(orphan, name) check_exists raise "Job #{id} cannot adopt itself as a child" if orphan == self raise "Job #{id} cannot adopt job #{orphan.id} as it already has a parent" unless orphan.parent.nil? raise "Job #{id} cannot adopt job #{orphan.id} as child name #{name} is not unique" if name.nil? || ! child(name).nil? SideJob.redis.multi do |multi| multi.set "#{orphan.redis_key}:parent", id.to_json multi.hset "#{redis_key}:children", name, orphan.id end end |
#aliases ⇒ Array<String>
Returns all aliases for the job.
54 55 56 |
# File 'lib/sidejob/job.rb', line 54 def aliases SideJob.redis.smembers "#{redis_key}:aliases" end |
#child(name) ⇒ SideJob::Job?
Returns a child job by name.
160 161 162 |
# File 'lib/sidejob/job.rb', line 160 def child(name) SideJob.find(SideJob.redis.hget("#{redis_key}:children", name)) end |
#children ⇒ Hash<String => SideJob::Job>
Returns all children jobs.
166 167 168 |
# File 'lib/sidejob/job.rb', line 166 def children SideJob.redis.hgetall("#{redis_key}:children").each_with_object({}) {|child, hash| hash[child[0]] = SideJob.find(child[1])} end |
#delete ⇒ Boolean
Deletes the job and all children jobs (recursively) if all are terminated.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/sidejob/job.rb', line 212 def delete return false unless terminated? parent = self.parent parent.disown(self) if parent children = self.children aliases = self.aliases # delete all SideJob keys and disown all children ports = inports.map(&:redis_key) + outports.map(&:redis_key) SideJob.redis.multi do |multi| multi.srem 'jobs', id multi.del redis_key multi.del ports + %w{worker status state aliases parent children inports outports inports:default outports:default inports:channels outports:channels created_at created_by ran_at}.map {|x| "#{redis_key}:#{x}" } children.each_value { |child| multi.hdel child.redis_key, 'parent' } aliases.each { |name| multi.hdel('jobs:aliases', name) } end # recursively delete all children children.each_value do |child| child.delete end publish({deleted: true}) return true end |
#disown(name_or_job) ⇒ Object
Disown a child job so that it no longer has a parent.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/sidejob/job.rb', line 178 def disown(name_or_job) if name_or_job.is_a?(SideJob::Job) job = name_or_job name = children.rassoc(job) raise "Job #{id} cannot disown job #{job.id} as it is not a child" unless name else name = name_or_job job = child(name) raise "Job #{id} cannot disown non-existent child #{name}" unless job end SideJob.redis.multi do |multi| multi.del "#{job.redis_key}:parent" multi.hdel "#{redis_key}:children", name end end |
#eql?(other) ⇒ Boolean
12 13 14 |
# File 'lib/sidejob/job.rb', line 12 def eql?(other) self == other end |
#exists? ⇒ Boolean
Returns if the job still exists.
29 30 31 |
# File 'lib/sidejob/job.rb', line 29 def exists? SideJob.redis.sismember 'jobs', id end |
#get(key) ⇒ Object?
Returns some data from the job’s internal state.
312 313 314 315 316 |
# File 'lib/sidejob/job.rb', line 312 def get(key) check_exists val = SideJob.redis.hget("#{redis_key}:state", key) val ? JSON.parse("[#{val}]")[0] : nil end |
#hash ⇒ Fixnum
Returns Hash value based on the id.
17 18 19 |
# File 'lib/sidejob/job.rb', line 17 def hash id.hash end |
#info ⇒ Hash
Returns
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/sidejob/job.rb', line 284 def info check_exists data = SideJob.redis.multi do |multi| multi.get "#{redis_key}:worker" multi.get "#{redis_key}:created_by" multi.get "#{redis_key}:created_at" multi.get "#{redis_key}:ran_at" end worker = JSON.parse(data[0]) { queue: worker['queue'], class: worker['class'], args: worker['args'], created_by: data[1], created_at: data[2], ran_at: data[3], } end |
#inports ⇒ Array<SideJob::Port>
Gets all input ports.
256 257 258 |
# File 'lib/sidejob/job.rb', line 256 def inports all_ports :in end |
#inports=(ports) ⇒ Object
Sets the input ports for the job. The ports are merged with the worker configuration. Any current ports that are not in the new port set are deleted (including any data on those ports).
264 265 266 |
# File 'lib/sidejob/job.rb', line 264 def inports=(ports) set_ports :in, ports end |
#input(name) ⇒ SideJob::Port
Returns an input port.
243 244 245 |
# File 'lib/sidejob/job.rb', line 243 def input(name) SideJob::Port.new(self, :in, name) end |
#lock(ttl, retries: 3, retry_delay: 0.2) ⇒ String?
Acquire a lock on the job with a given expiration time.
340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/sidejob/job.rb', line 340 def lock(ttl, retries: 3, retry_delay: 0.2) check_exists retries.times do token = SecureRandom.uuid if SideJob.redis.set("#{redis_key}:lock", token, {nx: true, ex: ttl}) return token # lock acquired else sleep Random.rand(retry_delay) end end return nil # lock not acquired end |
#outports ⇒ Array<SideJob::Port>
Gets all output ports.
270 271 272 |
# File 'lib/sidejob/job.rb', line 270 def outports all_ports :out end |
#outports=(ports) ⇒ Object
Sets the input ports for the job. The ports are merged with the worker configuration. Any current ports that are not in the new port set are deleted (including any data on those ports).
278 279 280 |
# File 'lib/sidejob/job.rb', line 278 def outports=(ports) set_ports :out, ports end |
#output(name) ⇒ SideJob::Port
Returns an output port
250 251 252 |
# File 'lib/sidejob/job.rb', line 250 def output(name) SideJob::Port.new(self, :out, name) end |
#parent ⇒ SideJob::Job?
Returns the parent job.
172 173 174 |
# File 'lib/sidejob/job.rb', line 172 def parent SideJob.find(SideJob.redis.get("#{redis_key}:parent")) end |
#publish(message) ⇒ Object
Publishes a message to the job’s channel.
376 377 378 |
# File 'lib/sidejob/job.rb', line 376 def publish() SideJob.publish "/sidejob/job/#{id}", end |
#queue(queue, klass, **options) ⇒ Object
Queues a child job, setting parent and by to self.
152 153 154 155 |
# File 'lib/sidejob/job.rb', line 152 def queue(queue, klass, **) check_exists SideJob.queue(queue, klass, .merge({parent: self, by: "job:#{id}"})) end |
#redis_key ⇒ String Also known as: to_s
Returns Prefix for all redis keys related to this job.
22 23 24 |
# File 'lib/sidejob/job.rb', line 22 def redis_key "job:#{id}" end |
#refresh_lock(ttl) ⇒ Boolean
Refresh the lock expiration.
356 357 358 359 |
# File 'lib/sidejob/job.rb', line 356 def refresh_lock(ttl) check_exists SideJob.redis.expire "#{redis_key}:lock", ttl end |
#remove_alias(name) ⇒ Object
Remove an alias for the job.
78 79 80 81 82 83 84 85 |
# File 'lib/sidejob/job.rb', line 78 def remove_alias(name) check_exists raise "#{name} is not an alias for job #{id}" unless SideJob.redis.sismember("#{redis_key}:aliases", name) SideJob.redis.multi do |multi| multi.hdel 'jobs:aliases', name multi.srem "#{redis_key}:aliases", name end end |
#run(parent: false, force: false, at: nil, wait: nil) ⇒ SideJob::Job?
Run the job. This method ensures that the job runs at least once from the beginning. If the job is currently running, it will run again. Just like sidekiq, we make no guarantees that the job will not be run more than once. Unless force is set, the job will only be run if the status is running, queued, suspended, or completed.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/sidejob/job.rb', line 97 def run(parent: false, force: false, at: nil, wait: nil) if parent pj = self.parent return pj ? pj.run(force: force, at: at, wait: wait) : nil end return nil unless force || %w{running queued suspended completed}.include?(status) self.status = 'queued' time = nil if at time = at time = time.to_f if time.is_a?(Time) elsif wait time = Time.now.to_f + wait end sidekiq_queue(time) self end |
#set(data) ⇒ Object
Sets values in the job’s internal state.
321 322 323 324 325 |
# File 'lib/sidejob/job.rb', line 321 def set(data) check_exists return unless data.size > 0 SideJob.redis.hmset "#{redis_key}:state", *(data.map {|k,v| [k, v.to_json]}.flatten) end |
#state ⇒ Hash{String => Object}
Returns the entirety of the job’s internal state.
302 303 304 305 306 307 |
# File 'lib/sidejob/job.rb', line 302 def state check_exists state = SideJob.redis.hgetall("#{redis_key}:state") state.update(state) {|k,v| JSON.parse("[#{v}]")[0]} state end |
#status ⇒ String
Retrieve the job’s status.
35 36 37 38 |
# File 'lib/sidejob/job.rb', line 35 def status check_exists SideJob.redis.get "#{redis_key}:status" end |
#status=(status) ⇒ Object
Set the job status.
42 43 44 45 46 47 48 49 50 |
# File 'lib/sidejob/job.rb', line 42 def status=(status) check_exists oldstatus = SideJob.redis.getset("#{redis_key}:status", status) if oldstatus != status && worker_config['status_publish'] != false SideJob::Port.group(log: false) do publish({status: status}) end end end |
#terminate(recursive: false) ⇒ SideJob::Job
Prepare to terminate the job. Sets status to ‘terminating’. Then queues the job so that its shutdown method if it exists can be run. After shutdown, the status will be ‘terminated’. If the job is currently running, it will finish running first. If the job is already terminated, it does nothing. To start the job after termination, call #run with force: true.
137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/sidejob/job.rb', line 137 def terminate(recursive: false) if status != 'terminated' self.status = 'terminating' sidekiq_queue end if recursive children.each_value do |child| child.terminate(recursive: true) end end self end |
#terminated? ⇒ Boolean
Returns if job and all children are terminated.
121 122 123 124 125 126 127 |
# File 'lib/sidejob/job.rb', line 121 def terminated? return false if status != 'terminated' children.each_value do |child| return false unless child.terminated? end return true end |
#unlock(token) ⇒ Boolean
Unlock job by deleting the lock only if it equals the lock token.
364 365 366 367 368 369 370 371 372 |
# File 'lib/sidejob/job.rb', line 364 def unlock(token) check_exists return SideJob.redis.eval(' if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end', { keys: ["#{redis_key}:lock"], argv: [token] }) == 1 end |