Module: Resque::Plugins::Groups

Included in:
Resque
Defined in:
lib/resque_groups.rb,
lib/resque_groups/version.rb,
lib/resque_groups/tracked_job.rb

Defined Under Namespace

Modules: TrackedJob

Constant Summary collapse

Version =
'0.6.3'

Instance Method Summary collapse

Instance Method Details

#delete_delayed_job(gid) ⇒ Object



76
77
78
79
80
81
82
83
# File 'lib/resque_groups.rb', line 76

def delete_delayed_job(gid)
  job_groups.update( {'_id' => gid},
  {
    '$inc' => {'delayed' => -1},
    '$set' => {'updated_at' => Time.now}
  }, {:upsert => true})
  #Resque.print_groups
end

#group_stats(gid) ⇒ Object

Returns the stats for a given job group



96
97
98
# File 'lib/resque_groups.rb', line 96

def group_stats(gid)
  job_groups.find_one({'_id' => gid})
end

#groups_countObject



104
105
106
# File 'lib/resque_groups.rb', line 104

def groups_count
  job_groups.count
end

#groups_stats(start, count) ⇒ Object



100
101
102
# File 'lib/resque_groups.rb', line 100

def groups_stats(start, count)
  job_groups.find({}, :skip => start, :limit => count, :sort => ['started_at', -1]).to_a
end

#job_groupsObject



16
17
18
19
# File 'lib/resque_groups.rb', line 16

def job_groups
  self.mongo ||= ENV['MONGO'] || 'localhost:27017'
  @job_groups ||= @db.collection('job_groups')
end

#parse_gid(*args) ⇒ Object

Given a set of args, looks through all the args that are Hashes for a :group_id key (the inject line replaces string keys with their symbols)



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/resque_groups.rb', line 109

def parse_gid(*args)
  gid = nil
  args.flatten.each do |arg|
    next unless arg.kind_of? Hash
    sym_arg = arg.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}
    if !sym_arg[:group_id].nil?
      gid = sym_arg[:group_id]
      break
    end
  end
  gid
end


122
123
124
125
126
127
128
# File 'lib/resque_groups.rb', line 122

def print_groups
  puts "--------------------"
  job_groups.find().each do |row|
    puts row.inspect
  end
  puts "--------------------"
end

#record_delayed_job(gid) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/resque_groups.rb', line 67

def record_delayed_job(gid)
  job_groups.update( {'_id' => gid},
  {
    '$inc' => {'delayed' => 1},
    '$set' => {'updated_at' => Time.now}
  }, {:upsert => true})
  #Resque.print_groups
end

#record_failed_job(gid, exception, *payload) ⇒ Object

Increments the ‘failed’ count of the group with id ‘gid’. Store the exception details, and the job payload.



57
58
59
60
61
62
63
64
65
# File 'lib/resque_groups.rb', line 57

def record_failed_job(gid, exception, *payload)
  job_groups.update( {'_id' => gid},
  {
    '$inc' => {'failed' => 1},
    '$push' => {'exceptions' => [exception.inspect, exception.backtrace, payload]},
    '$set' => {'updated_at' => Time.now}
  }, {:upsert => true})
  #Resque.print_groups
end

#record_performed_job(gid, atomic_op = nil) ⇒ Object

Increments the ‘completed’ count of the group with id ‘gid’. Also merge the increment operation with optional custom atomic operations.



44
45
46
47
48
49
50
51
52
53
# File 'lib/resque_groups.rb', line 44

def record_performed_job(gid, atomic_op = nil)
  op = {'$inc' => {'completed' => 1}, '$set' => {'updated_at' => Time.now}}
  if atomic_op
    op['$inc'].merge!(atomic_op.delete('$inc')) if atomic_op['$inc']
    op['$set'].merge!(atomic_op.delete('$set')) if atomic_op['$set']
    op.merge!(atomic_op)
  end
  job_groups.update( {'_id' => gid}, op , {:upsert => true})
  #Resque.print_groups
end

#remove_group(gid) ⇒ Object

Deletes a group from the job_groups collection



91
92
93
# File 'lib/resque_groups.rb', line 91

def remove_group(gid)
  job_groups.remove({'_id' => gid})
end

#reset_groups_queueObject

Clears all the groups that are being stored in the job_groups collection



86
87
88
# File 'lib/resque_groups.rb', line 86

def reset_groups_queue
  job_groups.remove
end

#upsert_group(gid) ⇒ Object

Triggered whenever a job belonging to group ‘gid’ has been enqueued. If there’s no entry for that group in the ‘job_groups’ collection, create one and record a timestamp. Otherwise, just increment a total count of enqueued jobs for that group



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/resque_groups.rb', line 23

def upsert_group(gid)
  if job_groups.find_one({'_id' => gid}).nil?
    job_groups.update( {'_id' => gid},
    {
      '$inc' => {'total' => 1},
      '$set' => {'started_at' => Time.now, 'updated_at' => Time.now}
    }, {:upsert => true})
  else
    job_groups.update( {'_id' => gid},
    {
      '$inc' => {'total' => 1},
      '$set' => {'updated_at' => Time.now}
    }, {:upsert => true})
  end
  #Resque.print_groups
  job_groups.count
end