Module: Resque::Plugins::Groups
- Included in:
- Resque
- Defined in:
- lib/resque_groups.rb,
lib/resque_groups/version.rb,
lib/resque_groups/tracked_job.rb
Defined Under Namespace
Modules: TrackedJob
Constant Summary collapse
- Version =
'0.6.3'
Instance Method Summary collapse
- #delete_delayed_job(gid) ⇒ Object
-
#group_stats(gid) ⇒ Object
Returns the stats for a given job group.
- #groups_count ⇒ Object
- #groups_stats(start, count) ⇒ Object
- #job_groups ⇒ Object
-
#parse_gid(*args) ⇒ Object
Given a set of args, looks through all the args that are Hashes for a :group_id key (the inject line replaces string keys with their symbols).
- #print_groups ⇒ Object
- #record_delayed_job(gid) ⇒ Object
-
#record_failed_job(gid, exception, *payload) ⇒ Object
Increments the ‘failed’ count of the group with id ‘gid’.
-
#record_performed_job(gid, atomic_op = nil) ⇒ Object
Increments the ‘completed’ count of the group with id ‘gid’.
-
#remove_group(gid) ⇒ Object
Deletes a group from the job_groups collection.
-
#reset_groups_queue ⇒ Object
Clears all the groups that are being stored in the job_groups collection.
-
#upsert_group(gid) ⇒ Object
Triggered whenever a job belonging to group ‘gid’ has been enqueued.
Instance Method Details
#delete_delayed_job(gid) ⇒ Object
76 77 78 79 80 81 82 83 |
# File 'lib/resque_groups.rb', line 76 def delete_delayed_job(gid) job_groups.update( {'_id' => gid}, { '$inc' => {'delayed' => -1}, '$set' => {'updated_at' => Time.now} }, {:upsert => true}) #Resque.print_groups end |
#group_stats(gid) ⇒ Object
Returns the stats for a given job group
96 97 98 |
# File 'lib/resque_groups.rb', line 96 def group_stats(gid) job_groups.find_one({'_id' => gid}) end |
#groups_count ⇒ Object
104 105 106 |
# File 'lib/resque_groups.rb', line 104 def groups_count job_groups.count end |
#groups_stats(start, count) ⇒ Object
100 101 102 |
# File 'lib/resque_groups.rb', line 100 def groups_stats(start, count) job_groups.find({}, :skip => start, :limit => count, :sort => ['started_at', -1]).to_a end |
#job_groups ⇒ Object
16 17 18 19 |
# File 'lib/resque_groups.rb', line 16 def job_groups self.mongo ||= ENV['MONGO'] || 'localhost:27017' @job_groups ||= @db.collection('job_groups') end |
#parse_gid(*args) ⇒ Object
Given a set of args, looks through all the args that are Hashes for a :group_id key (the inject line replaces string keys with their symbols)
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/resque_groups.rb', line 109 def parse_gid(*args) gid = nil args.flatten.each do |arg| next unless arg.kind_of? Hash sym_arg = arg.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} if !sym_arg[:group_id].nil? gid = sym_arg[:group_id] break end end gid end |
#print_groups ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/resque_groups.rb', line 122 def print_groups puts "--------------------" job_groups.find().each do |row| puts row.inspect end puts "--------------------" end |
#record_delayed_job(gid) ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/resque_groups.rb', line 67 def record_delayed_job(gid) job_groups.update( {'_id' => gid}, { '$inc' => {'delayed' => 1}, '$set' => {'updated_at' => Time.now} }, {:upsert => true}) #Resque.print_groups end |
#record_failed_job(gid, exception, *payload) ⇒ Object
Increments the ‘failed’ count of the group with id ‘gid’. Store the exception details, and the job payload.
57 58 59 60 61 62 63 64 65 |
# File 'lib/resque_groups.rb', line 57 def record_failed_job(gid, exception, *payload) job_groups.update( {'_id' => gid}, { '$inc' => {'failed' => 1}, '$push' => {'exceptions' => [exception.inspect, exception.backtrace, payload]}, '$set' => {'updated_at' => Time.now} }, {:upsert => true}) #Resque.print_groups end |
#record_performed_job(gid, atomic_op = nil) ⇒ Object
Increments the ‘completed’ count of the group with id ‘gid’. Also merge the increment operation with optional custom atomic operations.
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/resque_groups.rb', line 44 def record_performed_job(gid, atomic_op = nil) op = {'$inc' => {'completed' => 1}, '$set' => {'updated_at' => Time.now}} if atomic_op op['$inc'].merge!(atomic_op.delete('$inc')) if atomic_op['$inc'] op['$set'].merge!(atomic_op.delete('$set')) if atomic_op['$set'] op.merge!(atomic_op) end job_groups.update( {'_id' => gid}, op , {:upsert => true}) #Resque.print_groups end |
#remove_group(gid) ⇒ Object
Deletes a group from the job_groups collection
91 92 93 |
# File 'lib/resque_groups.rb', line 91 def remove_group(gid) job_groups.remove({'_id' => gid}) end |
#reset_groups_queue ⇒ Object
Clears all the groups that are being stored in the job_groups collection
86 87 88 |
# File 'lib/resque_groups.rb', line 86 def reset_groups_queue job_groups.remove end |
#upsert_group(gid) ⇒ Object
Triggered whenever a job belonging to group ‘gid’ has been enqueued. If there’s no entry for that group in the ‘job_groups’ collection, create one and record a timestamp. Otherwise, just increment a total count of enqueued jobs for that group
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/resque_groups.rb', line 23 def upsert_group(gid) if job_groups.find_one({'_id' => gid}).nil? job_groups.update( {'_id' => gid}, { '$inc' => {'total' => 1}, '$set' => {'started_at' => Time.now, 'updated_at' => Time.now} }, {:upsert => true}) else job_groups.update( {'_id' => gid}, { '$inc' => {'total' => 1}, '$set' => {'updated_at' => Time.now} }, {:upsert => true}) end #Resque.print_groups job_groups.count end |