Class: RelaxedJob::Queue
- Inherits:
-
Object
- Object
- RelaxedJob::Queue
- Defined in:
- lib/relaxed_job/queue.rb
Instance Attribute Summary collapse
-
#couchdb_url ⇒ Object
readonly
Returns the value of attribute couchdb_url.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #clear_locks! ⇒ Object
-
#completed_jobs ⇒ Object
job types #################################################################.
- #enqueue(object) ⇒ Object
- #enqueue_with_method(object, method, *args) ⇒ Object
- #errored_jobs ⇒ Object
-
#initialize(couchdb_url) ⇒ Queue
constructor
A new instance of Queue.
- #lock(count) ⇒ Object
- #locked_jobs ⇒ Object
- #pending_jobs(count) ⇒ Object
- #work(worker_name = worker_name) ⇒ Object
- #worker_name ⇒ Object
Constructor Details
#initialize(couchdb_url) ⇒ Queue
Returns a new instance of Queue.
8 9 10 11 |
# File 'lib/relaxed_job/queue.rb', line 8 def initialize(couchdb_url) @couchdb_url = couchdb_url @options = end |
Instance Attribute Details
#couchdb_url ⇒ Object (readonly)
Returns the value of attribute couchdb_url.
5 6 7 |
# File 'lib/relaxed_job/queue.rb', line 5 def couchdb_url @couchdb_url end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
6 7 8 |
# File 'lib/relaxed_job/queue.rb', line 6 def @options end |
Instance Method Details
#clear_locks! ⇒ Object
36 37 38 39 40 41 42 |
# File 'lib/relaxed_job/queue.rb', line 36 def clear_locks! locked_jobs.each do |job| job['state'] = 'pending' job.delete('locked_by') couchdb.save_doc(job) end end |
#completed_jobs ⇒ Object
job types #################################################################
80 81 82 |
# File 'lib/relaxed_job/queue.rb', line 80 def completed_jobs jobs_by_type(:completed) end |
#enqueue(object) ⇒ Object
13 14 15 |
# File 'lib/relaxed_job/queue.rb', line 13 def enqueue(object) enqueue_with_method object, :perform end |
#enqueue_with_method(object, method, *args) ⇒ Object
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/relaxed_job/queue.rb', line 17 def enqueue_with_method(object, method, *args) couchdb.save_doc({ 'class' => 'job', 'state' => 'pending', 'method' => method.to_s, 'arguments' => args.to_a, 'object' => Marshal.dump(object), 'queued_at' => Time.now.utc }) end |
#errored_jobs ⇒ Object
84 85 86 |
# File 'lib/relaxed_job/queue.rb', line 84 def errored_jobs jobs_by_type(:errored) end |
#lock(count) ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/relaxed_job/queue.rb', line 28 def lock(count) pending_jobs(count).each do |job| job['state'] = 'locked' job['locked_by'] = worker_name couchdb.save_doc(job) end end |
#locked_jobs ⇒ Object
88 89 90 |
# File 'lib/relaxed_job/queue.rb', line 88 def locked_jobs jobs_by_type(:locked, :key => worker_name) end |
#pending_jobs(count) ⇒ Object
92 93 94 |
# File 'lib/relaxed_job/queue.rb', line 92 def pending_jobs(count) jobs_by_type(:pending, :limit => count) end |
#work(worker_name = worker_name) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/relaxed_job/queue.rb', line 44 def work(worker_name=worker_name) lock 3 counts = { :complete => 0, :error => 0 } locked_jobs.each do |job| begin object = Marshal.load(job['object']) object.send(job['method'], *(job['arguments'])) counts[:complete] += 1 job['state'] = 'complete' job['completed_at'] = Time.now.utc couchdb.save_doc(job) rescue StandardError => ex counts[:error] += 1 job['state'] = 'error' job['exception'] = Marshal.dump(ex) job['errored_at'] = Time.now.utc couchdb.save_doc(job) end end counts end |
#worker_name ⇒ Object
72 73 74 75 76 |
# File 'lib/relaxed_job/queue.rb', line 72 def worker_name "host:#{Socket.gethostname} pid:#{$$}" rescue "pid:#{$$}" end |