Class: Cyclop::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/cyclop/job.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs = {}) ⇒ Job

Returns a new instance of Job.

Raises:

  • (ArgumentError)


34
35
36
37
# File 'lib/cyclop/job.rb', line 34

def initialize(attrs={})
  raise ArgumentError, ":queue is required" unless attrs["queue"] || attrs[:queue]
  self.attributes = attrs
end

Instance Attribute Details

#_idObject

Unique identifier



4
5
6
# File 'lib/cyclop/job.rb', line 4

def _id
  @_id
end

#attemptsObject

Number of attempts



30
31
32
# File 'lib/cyclop/job.rb', line 30

def attempts
  @attempts
end

#created_atObject

Time it was created



20
21
22
# File 'lib/cyclop/job.rb', line 20

def created_at
  @created_at
end

#created_byObject

Host it’s added under



18
19
20
# File 'lib/cyclop/job.rb', line 18

def created_by
  @created_by
end

#delayObject

Delay in seconds



10
11
12
# File 'lib/cyclop/job.rb', line 10

def delay
  @delay
end

#delayed_untilObject

Time until we do start the job



12
13
14
# File 'lib/cyclop/job.rb', line 12

def delayed_until
  @delayed_until
end

#errorsObject

Backtraces of unsuccessful attempts



32
33
34
# File 'lib/cyclop/job.rb', line 32

def errors
  @errors
end

#failedObject

Mark as failed



28
29
30
# File 'lib/cyclop/job.rb', line 28

def failed
  @failed
end

#job_paramsObject

Parameters sent to ‘#perform`



8
9
10
# File 'lib/cyclop/job.rb', line 8

def job_params
  @job_params
end

#locked_atObject

Time when worker started



26
27
28
# File 'lib/cyclop/job.rb', line 26

def locked_at
  @locked_at
end

#locked_byObject

Worker unique identifier



24
25
26
# File 'lib/cyclop/job.rb', line 24

def locked_by
  @locked_by
end

#queueObject

Queue name



6
7
8
# File 'lib/cyclop/job.rb', line 6

def queue
  @queue
end

#retriesObject

Number of retries before being marked as failed



14
15
16
# File 'lib/cyclop/job.rb', line 14

def retries
  @retries
end

#splayObject

Time in seconds between retry



16
17
18
# File 'lib/cyclop/job.rb', line 16

def splay
  @splay
end

#updated_atObject

Time it was last updated



22
23
24
# File 'lib/cyclop/job.rb', line 22

def updated_at
  @updated_at
end

Class Method Details

.create(opts = {}) ⇒ Object

Create a new job and save it to the queue specified in ‘opts`



40
41
42
43
44
# File 'lib/cyclop/job.rb', line 40

def self.create(opts={})
  job = new opts
  job.save
  job
end

.failed(opts = {}) ⇒ Object

Get failed jobs from any ‘opts`



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/cyclop/job.rb', line 89

def self.failed(opts={})
  selector = {}
  # Failed or dead jobs only
  selector["$or"] = [
    {failed: true}, 
    {"$where" => "this.attempts > this.retries"},
  ]
  # Filter by queue if present
  selector[:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty?

  options = {}
  options[:skip] = opts[:skip] if opts[:skip]
  options[:limit] = opts[:limit] if opts[:limit]

  collection.find(selector, options).collect{|attrs| new attrs}
end

.find(id) ⇒ Object



106
107
108
109
110
# File 'lib/cyclop/job.rb', line 106

def self.find(id)
  if doc = collection.find_one(id)
    new doc
  end
end

.next(opts = {}) ⇒ Object

Get the next job from any ‘opts` and mark it as locked



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/cyclop/job.rb', line 47

def self.next(opts={})
  raise ArgumentError, "locked_by is required" unless opts[:locked_by]

  time_now = Time.now.utc

  conditions = {query: {}}
  # Not failed jobs only
  conditions[:query][:failed] = false
  # Only jobs generated by the specified host if present
  conditions[:query][:created_by] = opts[:host] if opts[:host]
  # Skip delayed jobs
  conditions[:query][:delayed_until] = {"$lte" => time_now}
  # Filter by queue if present
  conditions[:query][:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty?
  # Skip locked jobs
  conditions[:query]["$or"] = [{locked_at: {"$lte" => time_now - 1800}}, {locked_at: nil}]
  # Last chance to skip dead jobs
  conditions[:query]["$where"] = "this.attempts <= this.retries"

  # Set `locked_by` with worker id and increment the number of attempts
  conditions[:update] = {
    "$set" => {
      locked_by: opts[:locked_by],
      locked_at: time_now,
    },
    "$inc" => {
      attempts: 1
    }
  }
  
  # Sort by `created_at`
  conditions[:sort] = [:created_at, :asc]

  # Returns the modified job
  conditions[:new] = true

  new collection.find_and_modify conditions
rescue Mongo::OperationFailure
  nil
end

Instance Method Details

#==(other) ⇒ Object



137
138
139
# File 'lib/cyclop/job.rb', line 137

def ==(other)
  other._id == _id
end

#complete!Object

Remove successfully processed job from the queue



142
143
144
# File 'lib/cyclop/job.rb', line 142

def complete!
  collection.remove _id: _id, locked_by: Cyclop.master_id
end

#persisted?Boolean

If we have an id the object is persisted

Returns:

  • (Boolean)


133
134
135
# File 'lib/cyclop/job.rb', line 133

def persisted?
  !!_id
end

#release!(exception = nil) ⇒ Object

Release job for further processing



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/cyclop/job.rb', line 147

def release!(exception = nil)
  now = ::Time.at(Time.now.to_i).utc
  selector = {_id: _id, locked_by: Cyclop.master_id}
  set = if attempts<=retries
    {locked_by: nil, locked_at: nil, delayed_until: now+splay}
  else
    {failed: true}
  end
  update = {"$set" => set}
  update["$push"] = {
    :errors => {
      :locked_by => locked_by,
      :locked_at => locked_at,
      :class => exception.class.name,
      :message => exception.message,
      :backtrace => exception.backtrace,
      :created_at => now,
    },
  } if exception
  collection.update selector, update, :safe => true
end

#reloadObject



127
128
129
130
# File 'lib/cyclop/job.rb', line 127

def reload
  self.attributes = collection.find_one _id
  self
end

#requeueObject



169
170
171
172
173
# File 'lib/cyclop/job.rb', line 169

def requeue
  self.attempts, self.failed, self.locked_at = 0, false, nil
  update = {attempts: attempts, failed: failed, locked_at: locked_at}
  collection.update({_id: _id}, {"$set" => update}, :safe => true)
end

#saveObject

Save to queue



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/cyclop/job.rb', line 113

def save
  self.updated_at = Time.now.utc
  if persisted?
    raise NotImplementedError
  else
    self.created_at = updated_at
    self.delayed_until = ::Time.at(created_at.to_i + delay).utc
    self._id = collection.insert attributes, safe: true
  end
  true
rescue Mongo::OperationFailure
  false
end