Class: Resque::Plugins::State::Hash

Inherits:
Hash
  • Object
show all
Defined in:
lib/resque/plugins/state/hash.rb

Overview

Resque::Plugins::State::Hash is a Hash object that has helper methods for dealing with the common status attributes. It also has a number of class methods for creating/updating/retrieving status objects from Redis

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Hash

Create a new Resque::Plugins::State::Hash object. If multiple arguments are passed it is assumed the first argument is the UUID and the rest are status objects. All arguments are subsequentily merged in order. Strings are assumed to be messages.


251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/resque/plugins/state/hash.rb', line 251

def initialize(*args)
  super nil
  base_status = {
    'time' => Time.now.to_i,
    'status' => Resque::Plugins::State::STATUS_QUEUED
  }
  base_status['uuid'] = args.shift if args.length > 1
  status_hash = args.inject(base_status) do |final, m|
    m = { 'message' => m } if m.is_a?(String)
    final.merge(m || {})
  end
  replace(status_hash)
end

Class Attribute Details

.expire_inObject

Returns the value of attribute expire_in


182
183
184
# File 'lib/resque/plugins/state/hash.rb', line 182

def expire_in
  @expire_in
end

Class Method Details

.clear(range_start = nil, range_end = nil) ⇒ Object

clear statuses from redis passing an optional range. See `statuses` for info about ranges


47
48
49
50
51
# File 'lib/resque/plugins/state/hash.rb', line 47

def self.clear(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).each do |id|
    remove(id)
  end
end

.clear_completed(range_start = nil, range_end = nil) ⇒ Object


53
54
55
56
57
58
59
60
61
62
# File 'lib/resque/plugins/state/hash.rb', line 53

def self.clear_completed(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).select do |id|
    if get(id).completed?
      remove(id)
      true
    else
      false
    end
  end
end

.clear_failed(range_start = nil, range_end = nil) ⇒ Object


64
65
66
67
68
69
70
71
72
73
# File 'lib/resque/plugins/state/hash.rb', line 64

def self.clear_failed(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).select do |id|
    if get(id).failed?
      remove(id)
      true
    else
      false
    end
  end
end

.create(uuid, *messages) ⇒ Object

Create a status, generating a new UUID, passing the message to the status Returns the UUID of the new status.


12
13
14
15
16
17
# File 'lib/resque/plugins/state/hash.rb', line 12

def self.create(uuid, *messages)
  set(uuid, *messages)
  redis.zadd(set_key, Time.now.to_i, uuid)
  redis.zremrangebyscore(set_key, 0, Time.now.to_i - @expire_in) if @expire_in
  uuid
end

.generate_uuidObject


206
207
208
# File 'lib/resque/plugins/state/hash.rb', line 206

def self.generate_uuid
  SecureRandom.hex.to_s
end

.get(uuid) ⇒ Object

Get a status by UUID. Returns a Resque::Plugins::State::Hash


20
21
22
23
# File 'lib/resque/plugins/state/hash.rb', line 20

def self.get(uuid)
  val = redis.get(status_key(uuid))
  val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil
end

.hash_accessor(name, options = {}) ⇒ Object


210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/resque/plugins/state/hash.rb', line 210

def self.hash_accessor(name, options = {})
  options[:default] ||= nil
  coerce = options[:coerce] ? ".#{options[:coerce]}" : ''
  module_eval <<-EOT
  def #{name}
    value = (self['#{name}'] ? self['#{name}']#{coerce} : #{options[:default].inspect})
    yield value if block_given?
    value
  end

  def #{name}=(value)
    self['#{name}'] = value
  end

  def #{name}?
    !!self['#{name}']
  end
  EOT
end

.kill(uuid) ⇒ Object

Kill the job at UUID on its next iteration this works by adding the UUID to a kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks if it should be killed by calling tick or at. If so, it raises a Resque::Plugins::State::Killed error and sets the status to 'killed'.


109
110
111
# File 'lib/resque/plugins/state/hash.rb', line 109

def self.kill(uuid)
  redis.sadd(kill_key, uuid)
end

.kill_idsObject

Return the UUIDs of the jobs on the kill list


119
120
121
# File 'lib/resque/plugins/state/hash.rb', line 119

def self.kill_ids
  redis.smembers(kill_key)
end

.kill_keyObject


198
199
200
# File 'lib/resque/plugins/state/hash.rb', line 198

def self.kill_key
  '_kill'
end

.killall(range_start = nil, range_end = nil) ⇒ Object

Kills num jobs within range starting with the most recent first. By default kills all jobs. Note that the same conditions apply as kill, i.e. only jobs that check on each iteration by calling tick or at are eligible to killed.

Examples:

killing the last 20 submitted jobs

Resque::Plugins::State::Hash.killall(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range


131
132
133
134
135
# File 'lib/resque/plugins/state/hash.rb', line 131

def self.killall(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).collect do |id|
    kill(id)
  end
end

.killed(uuid) ⇒ Object

Remove the job at UUID from the kill list


114
115
116
# File 'lib/resque/plugins/state/hash.rb', line 114

def self.killed(uuid)
  redis.srem(kill_key, uuid)
end

.mget(uuids) ⇒ Object

Get multiple statuses by UUID. Returns array of Resque::Plugins::State::Hash


26
27
28
29
30
31
32
33
34
# File 'lib/resque/plugins/state/hash.rb', line 26

def self.mget(uuids)
  return [] if uuids.empty?
  status_keys = uuids.map { |u| status_key(u) }
  vals = redis.mget(*status_keys)

  uuids.zip(vals).map do |uuid, val|
    val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil
  end
end

.pause(uuid) ⇒ Object

pause the job at UUID on its next iteration this works by adding the UUID to a pause list (a.k.a. a list of jobs to be pauseed. Each iteration the job checks if it should be pauseed by calling tick or at. If so, it sleeps for 10 seconds before checking again if it should continue sleeping


146
147
148
# File 'lib/resque/plugins/state/hash.rb', line 146

def self.pause(uuid)
  redis.sadd(pause_key, uuid)
end

.pause_idsObject

Return the UUIDs of the jobs on the pause list


156
157
158
# File 'lib/resque/plugins/state/hash.rb', line 156

def self.pause_ids
  redis.smembers(pause_key)
end

.pause_keyObject


202
203
204
# File 'lib/resque/plugins/state/hash.rb', line 202

def self.pause_key
  '_pause'
end

.pauseall(range_start = nil, range_end = nil) ⇒ Object

pauses num jobs within range starting with the most recent first. By default pauses all jobs. Note that the same conditions apply as pause, i.e. only jobs that check on each iteration by calling tick or at are eligible to pauseed.

Examples:

pauseing the last 20 submitted jobs

Resque::Plugins::State::Hash.pauseall(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range


168
169
170
171
172
# File 'lib/resque/plugins/state/hash.rb', line 168

def self.pauseall(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).collect do |id|
    pause(id)
  end
end

.remove(uuid) ⇒ Object


75
76
77
78
# File 'lib/resque/plugins/state/hash.rb', line 75

def self.remove(uuid)
  redis.del(status_key(uuid))
  redis.zrem(set_key, uuid)
end

.set(uuid, *messages) ⇒ Object

set a status by UUID. messages can be any number of strings or hashes that are merged in order to create a single status.


38
39
40
41
42
43
# File 'lib/resque/plugins/state/hash.rb', line 38

def self.set(uuid, *messages)
  val = Resque::Plugins::State::Hash.new(uuid, *messages)
  redis.set(status_key(uuid), encode(val))
  redis.expire(status_key(uuid), expire_in) if expire_in
  val
end

.set_keyObject


194
195
196
# File 'lib/resque/plugins/state/hash.rb', line 194

def self.set_key
  '_statuses'
end

.should_kill?(uuid) ⇒ Boolean

Check whether a job with UUID is on the kill list

Returns:

  • (Boolean)

138
139
140
# File 'lib/resque/plugins/state/hash.rb', line 138

def self.should_kill?(uuid)
  redis.sismember(kill_key, uuid)
end

.should_pause?(uuid) ⇒ Boolean

Check whether a job with UUID is on the pause list

Returns:

  • (Boolean)

175
176
177
# File 'lib/resque/plugins/state/hash.rb', line 175

def self.should_pause?(uuid)
  redis.sismember(pause_key, uuid)
end

.status_ids(range_start = nil, range_end = nil) ⇒ Object

Return the num most recent status/job UUIDs in reverse chronological order.


92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/resque/plugins/state/hash.rb', line 92

def self.status_ids(range_start = nil, range_end = nil)
  if range_end && range_start
    # Because we want a reverse chronological order, we need to get a range starting
    # by the higest negative number. The ordering is transparent from the API user's
    # perspective so we need to convert the passed params
    (redis.zrevrange(set_key, range_start.abs, (range_end || 1).abs) || [])
  else
    # Because we want a reverse chronological order, we need to get a range starting
    # by the higest negative number.
    redis.zrevrange(set_key, 0, -1) || []
  end
end

.status_key(uuid) ⇒ Object


190
191
192
# File 'lib/resque/plugins/state/hash.rb', line 190

def self.status_key(uuid)
  "status:#{uuid}"
end

.statuses(range_start = nil, range_end = nil) ⇒ Object

Return num Resque::Plugins::State::Hash objects in reverse chronological order. By default returns the entire set.

Examples:

retuning the last 20 statuses

Resque::Plugins::State::Hash.statuses(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range


86
87
88
89
# File 'lib/resque/plugins/state/hash.rb', line 86

def self.statuses(range_start = nil, range_end = nil)
  ids = status_ids(range_start, range_end)
  mget(ids).compact || []
end

.unpause(uuid) ⇒ Object

Remove the job at UUID from the pause list


151
152
153
# File 'lib/resque/plugins/state/hash.rb', line 151

def self.unpause(uuid)
  redis.srem(pause_key, uuid)
end

Instance Method Details

#inspectObject


320
321
322
# File 'lib/resque/plugins/state/hash.rb', line 320

def inspect
  "#<Resque::Plugins::State::Hash #{super}>"
end

#jsonObject

Return a JSON representation of the current object.


314
315
316
317
318
# File 'lib/resque/plugins/state/hash.rb', line 314

def json
  h = dup
  h['pct_complete'] = pct_complete
  self.class.encode(h)
end

#killable?Boolean

Can the job be killed? failed, completed, and killed jobs can't be killed, for obvious reasons

Returns:

  • (Boolean)

297
298
299
# File 'lib/resque/plugins/state/hash.rb', line 297

def killable?
  !failed? && !completed? && !killed?
end

#pausable?Boolean

Can the job be paused? failed, completed, paused, and killed jobs can't be paused, for obvious reasons

Returns:

  • (Boolean)

303
304
305
# File 'lib/resque/plugins/state/hash.rb', line 303

def pausable?
  !failed? && !completed? && !killed? && !paused?
end

#pct_completeObject

calculate the % completion of the job based on status, num and total


267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/resque/plugins/state/hash.rb', line 267

def pct_complete
  if completed?
    100
  elsif queued?
    0
  elsif failed?
    0
  else
    if total.nil?
      t = 1
    else t = total
    end
    (((num || 0).to_f / t.to_f) * 100).to_i
  end
end

#timeObject

Return the time of the status initialization. If set returns a Time object, otherwise returns nil


285
286
287
# File 'lib/resque/plugins/state/hash.rb', line 285

def time
  time? ? Time.at(self['time']) : nil
end

#to_json(*_args) ⇒ Object


308
309
310
# File 'lib/resque/plugins/state/hash.rb', line 308

def to_json(*_args)
  json
end