Class: Cyclop::Job
- Inherits:
-
Object
- Object
- Cyclop::Job
- Defined in:
- lib/cyclop/job.rb
Instance Attribute Summary collapse
-
#_id ⇒ Object
Unique identifier.
-
#attempts ⇒ Object
Number of attempts.
-
#created_at ⇒ Object
Time it was created.
-
#created_by ⇒ Object
Host it’s added under.
-
#delay ⇒ Object
Delay in seconds.
-
#delayed_until ⇒ Object
Time until we do start the job.
-
#errors ⇒ Object
Backtraces of unsuccessful attempts.
-
#failed ⇒ Object
Mark as failed.
-
#job_params ⇒ Object
Parameters sent to ‘#perform`.
-
#locked_at ⇒ Object
Time when worker started.
-
#locked_by ⇒ Object
Worker unique identifier.
-
#queue ⇒ Object
Queue name.
-
#retries ⇒ Object
Number of retries before being marked as failed.
-
#splay ⇒ Object
Time in seconds between retry.
-
#updated_at ⇒ Object
Time it was last updated.
Class Method Summary collapse
-
.create(opts = {}) ⇒ Object
Create a new job and save it to the queue specified in ‘opts`.
-
.failed(opts = {}) ⇒ Object
Get failed jobs from any ‘opts`.
- .find(id) ⇒ Object
-
.next(opts = {}) ⇒ Object
Get the next job from any ‘opts` and mark it as locked.
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#complete! ⇒ Object
Remove successfully processed job from the queue.
-
#initialize(attrs = {}) ⇒ Job
constructor
A new instance of Job.
-
#persisted? ⇒ Boolean
If we have an id the object is persisted.
-
#release!(exception = nil) ⇒ Object
Release job for further processing.
- #reload ⇒ Object
- #requeue ⇒ Object
-
#save ⇒ Object
Save to queue.
Constructor Details
#initialize(attrs = {}) ⇒ Job
Returns a new instance of Job.
34 35 36 37 |
# File 'lib/cyclop/job.rb', line 34 def initialize(attrs={}) raise ArgumentError, ":queue is required" unless attrs["queue"] || attrs[:queue] self.attributes = attrs end |
Instance Attribute Details
#_id ⇒ Object
Unique identifier
4 5 6 |
# File 'lib/cyclop/job.rb', line 4 def _id @_id end |
#attempts ⇒ Object
Number of attempts
30 31 32 |
# File 'lib/cyclop/job.rb', line 30 def attempts @attempts end |
#created_at ⇒ Object
Time it was created
20 21 22 |
# File 'lib/cyclop/job.rb', line 20 def created_at @created_at end |
#created_by ⇒ Object
Host it’s added under
18 19 20 |
# File 'lib/cyclop/job.rb', line 18 def created_by @created_by end |
#delay ⇒ Object
Delay in seconds
10 11 12 |
# File 'lib/cyclop/job.rb', line 10 def delay @delay end |
#delayed_until ⇒ Object
Time until we do start the job
12 13 14 |
# File 'lib/cyclop/job.rb', line 12 def delayed_until @delayed_until end |
#errors ⇒ Object
Backtraces of unsuccessful attempts
32 33 34 |
# File 'lib/cyclop/job.rb', line 32 def errors @errors end |
#failed ⇒ Object
Mark as failed
28 29 30 |
# File 'lib/cyclop/job.rb', line 28 def failed @failed end |
#job_params ⇒ Object
Parameters sent to ‘#perform`
8 9 10 |
# File 'lib/cyclop/job.rb', line 8 def job_params @job_params end |
#locked_at ⇒ Object
Time when worker started
26 27 28 |
# File 'lib/cyclop/job.rb', line 26 def locked_at @locked_at end |
#locked_by ⇒ Object
Worker unique identifier
24 25 26 |
# File 'lib/cyclop/job.rb', line 24 def locked_by @locked_by end |
#queue ⇒ Object
Queue name
6 7 8 |
# File 'lib/cyclop/job.rb', line 6 def queue @queue end |
#retries ⇒ Object
Number of retries before being marked as failed
14 15 16 |
# File 'lib/cyclop/job.rb', line 14 def retries @retries end |
#splay ⇒ Object
Time in seconds between retry
16 17 18 |
# File 'lib/cyclop/job.rb', line 16 def splay @splay end |
#updated_at ⇒ Object
Time it was last updated
22 23 24 |
# File 'lib/cyclop/job.rb', line 22 def updated_at @updated_at end |
Class Method Details
.create(opts = {}) ⇒ Object
Create a new job and save it to the queue specified in ‘opts`
40 41 42 43 44 |
# File 'lib/cyclop/job.rb', line 40 def self.create(opts={}) job = new opts job.save job end |
.failed(opts = {}) ⇒ Object
Get failed jobs from any ‘opts`
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/cyclop/job.rb', line 89 def self.failed(opts={}) selector = {} # Failed or dead jobs only selector["$or"] = [ {failed: true}, {"$where" => "this.attempts > this.retries"}, ] # Filter by queue if present selector[:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty? = {} [:skip] = opts[:skip] if opts[:skip] [:limit] = opts[:limit] if opts[:limit] collection.find(selector, ).collect{|attrs| new attrs} end |
.find(id) ⇒ Object
106 107 108 109 110 |
# File 'lib/cyclop/job.rb', line 106 def self.find(id) if doc = collection.find_one(id) new doc end end |
.next(opts = {}) ⇒ Object
Get the next job from any ‘opts` and mark it as locked
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/cyclop/job.rb', line 47 def self.next(opts={}) raise ArgumentError, "locked_by is required" unless opts[:locked_by] time_now = Time.now.utc conditions = {query: {}} # Not failed jobs only conditions[:query][:failed] = false # Only jobs generated by the specified host if present conditions[:query][:created_by] = opts[:host] if opts[:host] # Skip delayed jobs conditions[:query][:delayed_until] = {"$lte" => time_now} # Filter by queue if present conditions[:query][:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty? # Skip locked jobs conditions[:query]["$or"] = [{locked_at: {"$lte" => time_now - 1800}}, {locked_at: nil}] # Last chance to skip dead jobs conditions[:query]["$where"] = "this.attempts <= this.retries" # Set `locked_by` with worker id and increment the number of attempts conditions[:update] = { "$set" => { locked_by: opts[:locked_by], locked_at: time_now, }, "$inc" => { attempts: 1 } } # Sort by `created_at` conditions[:sort] = [:created_at, :asc] # Returns the modified job conditions[:new] = true new collection.find_and_modify conditions rescue Mongo::OperationFailure nil end |
Instance Method Details
#==(other) ⇒ Object
137 138 139 |
# File 'lib/cyclop/job.rb', line 137 def ==(other) other._id == _id end |
#complete! ⇒ Object
Remove successfully processed job from the queue
142 143 144 |
# File 'lib/cyclop/job.rb', line 142 def complete! collection.remove _id: _id, locked_by: Cyclop.master_id end |
#persisted? ⇒ Boolean
If we have an id the object is persisted
133 134 135 |
# File 'lib/cyclop/job.rb', line 133 def persisted? !!_id end |
#release!(exception = nil) ⇒ Object
Release job for further processing
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/cyclop/job.rb', line 147 def release!(exception = nil) now = ::Time.at(Time.now.to_i).utc selector = {_id: _id, locked_by: Cyclop.master_id} set = if attempts<=retries {locked_by: nil, locked_at: nil, delayed_until: now+splay} else {failed: true} end update = {"$set" => set} update["$push"] = { :errors => { :locked_by => locked_by, :locked_at => locked_at, :class => exception.class.name, :message => exception., :backtrace => exception.backtrace, :created_at => now, }, } if exception collection.update selector, update, :safe => true end |
#reload ⇒ Object
127 128 129 130 |
# File 'lib/cyclop/job.rb', line 127 def reload self.attributes = collection.find_one _id self end |
#requeue ⇒ Object
169 170 171 172 173 |
# File 'lib/cyclop/job.rb', line 169 def requeue self.attempts, self.failed, self.locked_at = 0, false, nil update = {attempts: attempts, failed: failed, locked_at: locked_at} collection.update({_id: _id}, {"$set" => update}, :safe => true) end |
#save ⇒ Object
Save to queue
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/cyclop/job.rb', line 113 def save self.updated_at = Time.now.utc if persisted? raise NotImplementedError else self.created_at = updated_at self.delayed_until = ::Time.at(created_at.to_i + delay).utc self._id = collection.insert attributes, safe: true end true rescue Mongo::OperationFailure false end |