Class: DistributedJob::Job
- Inherits:
-
Object
- Object
- DistributedJob::Job
- Defined in:
- lib/distributed_job/job.rb
Overview
A ‘DistributedJob::Job` instance allows to keep track of a distributed job, i.e. a job which is split into multiple units running in parallel and in multiple workers using redis.
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#token ⇒ Object
readonly
Returns the value of attribute token.
-
#ttl ⇒ Object
readonly
Returns the value of attribute ttl.
Instance Method Summary collapse
-
#count ⇒ Object
Returns the number of pushed parts which are not finished.
-
#done(part) ⇒ Object
Removes the specified part from the distributed job, i.e.
-
#finished? ⇒ Boolean
Returns true if there are no more unfinished parts.
-
#initialize(client:, token:, ttl: 86_400) ⇒ Job
constructor
Initializes a new distributed job.
-
#open_part?(part) ⇒ Boolean
Returns whether or not the part is in the list of open parts of the distributed job.
-
#open_parts ⇒ Enumerator
Returns all parts of the distributed job which are not yet finished.
-
#push_all(enum) ⇒ Object
Pass an enum to be used to iterate all the units of work of the distributed job.
-
#push_each(enum) {|previous_object, previous_index.to_s| ... } ⇒ Object
Pass an enum to be used to iterate all the units of work of the distributed job.
-
#stop ⇒ Object
Allows to stop a distributed job.
-
#stopped? ⇒ Boolean
Returns true when the distributed job was stopped or false otherwise.
-
#total ⇒ Object
Returns the total number of pushed parts, no matter if finished or not.
Constructor Details
#initialize(client:, token:, ttl: 86_400) ⇒ Job
Initializes a new distributed job.
61 62 63 64 65 |
# File 'lib/distributed_job/job.rb', line 61 def initialize(client:, token:, ttl: 86_400) @client = client @token = token @ttl = ttl end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
40 41 42 |
# File 'lib/distributed_job/job.rb', line 40 def client @client end |
#token ⇒ Object (readonly)
Returns the value of attribute token.
40 41 42 |
# File 'lib/distributed_job/job.rb', line 40 def token @token end |
#ttl ⇒ Object (readonly)
Returns the value of attribute ttl.
40 41 42 |
# File 'lib/distributed_job/job.rb', line 40 def ttl @ttl end |
Instance Method Details
#count ⇒ Object
Returns the number of pushed parts which are not finished.
203 204 205 |
# File 'lib/distributed_job/job.rb', line 203 def count redis.scard("#{redis_key}:parts") end |
#done(part) ⇒ Object
Removes the specified part from the distributed job, i.e. from the set of unfinished parts. Use this method when the respective job part has been successfully processed, i.e. finished.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/distributed_job/job.rb', line 174 def done(part) @done_script ||= <<~SCRIPT local key, part, ttl = ARGV[1], ARGV[2], tonumber(ARGV[3]) if redis.call('srem', key .. ':parts', part) == 0 then return end redis.call('expire', key .. ':parts', ttl) redis.call('expire', key .. ':state', ttl) return redis.call('scard', key .. ':parts') SCRIPT redis.eval(@done_script, argv: [redis_key, part.to_s, ttl]) == 0 && closed? end |
#finished? ⇒ Boolean
Returns true if there are no more unfinished parts.
212 213 214 |
# File 'lib/distributed_job/job.rb', line 212 def finished? closed? && count.zero? end |
#open_part?(part) ⇒ Boolean
Returns whether or not the part is in the list of open parts of the distributed job.
151 152 153 |
# File 'lib/distributed_job/job.rb', line 151 def open_part?(part) redis.sismember("#{redis_key}:parts", part.to_s) end |
#open_parts ⇒ Enumerator
Returns all parts of the distributed job which are not yet finished.
142 143 144 |
# File 'lib/distributed_job/job.rb', line 142 def open_parts redis.sscan_each("#{redis_key}:parts") end |
#push_all(enum) ⇒ Object
Pass an enum to be used to iterate all the units of work of the distributed job. The values of the enum are used for the names of the parts, such that values listed multiple times (duplicates) will only be added once to the distributed job. The distributed job needs to know all of them to keep track of the overall number and status of the parts. Passing an enum is much better compared to pushing the parts manually, because the distributed job needs to be closed before the last part of the distributed job is enqueued into some job queue. Otherwise it could potentially happen that the last part is already processed in the job queue before it is pushed to redis, such that the last job doesn’t know that the distributed job is finished.
128 129 130 131 132 133 134 135 136 |
# File 'lib/distributed_job/job.rb', line 128 def push_all(enum) raise(AlreadyClosed, 'The distributed job is already closed') if closed? enum.each do |part| push(part) end close end |
#push_each(enum) {|previous_object, previous_index.to_s| ... } ⇒ Object
Pass an enum to be used to iterate all the units of work of the distributed job. The distributed job needs to know all of them to keep track of the overall number and status of the parts. Passing an enum is much better compared to pushing the parts manually, because the distributed job needs to be closed before the last part of the distributed job is enqueued into some job queue. Otherwise it could potentially happen that the last part is already processed in the job queue before it is pushed to redis, such that the last job doesn’t know that the distributed job is finished.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/distributed_job/job.rb', line 89 def push_each(enum) raise(AlreadyClosed, 'The distributed job is already closed') if closed? previous_object = nil previous_index = nil enum.each_with_index do |current_object, current_index| push(current_index) yield(previous_object, previous_index.to_s) if previous_index previous_object = current_object previous_index = current_index end close yield(previous_object, previous_index.to_s) if previous_index end |
#stop ⇒ Object
Allows to stop a distributed job. This is useful if some error occurred in some part, i.e. background job, of the distributed job and you then want to stop all other not yet finished parts. Please note that only jobs can be stopped which ask the distributed job actively whether or not it was stopped.
241 242 243 244 245 246 247 248 249 250 |
# File 'lib/distributed_job/job.rb', line 241 def stop redis.multi do |transaction| transaction.hset("#{redis_key}:state", 'stopped', 1) transaction.expire("#{redis_key}:state", ttl) transaction.expire("#{redis_key}:parts", ttl) end true end |
#stopped? ⇒ Boolean
Returns true when the distributed job was stopped or false otherwise.
273 274 275 |
# File 'lib/distributed_job/job.rb', line 273 def stopped? redis.hget("#{redis_key}:state", 'stopped') == '1' end |
#total ⇒ Object
Returns the total number of pushed parts, no matter if finished or not.
194 195 196 |
# File 'lib/distributed_job/job.rb', line 194 def total redis.hget("#{redis_key}:state", 'total').to_i end |