Class: RelaxedJob::Queue
- Inherits:
-
Object
- Object
- RelaxedJob::Queue
- Defined in:
- lib/relaxed_job/queue.rb
Instance Attribute Summary collapse
-
#couchdb_url ⇒ Object
readonly
Returns the value of attribute couchdb_url.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #clear_locks! ⇒ Object
-
#completed_jobs ⇒ Object
job types #################################################################.
- #enqueue(object, *args) ⇒ Object
- #enqueue_with_method(object, method, *args) ⇒ Object
- #errored_jobs ⇒ Object
-
#initialize(couchdb_url) ⇒ Queue
constructor
A new instance of Queue.
- #job(id) ⇒ Object
- #lock(count) ⇒ Object
- #locked_jobs ⇒ Object
- #pending_jobs(count) ⇒ Object
- #work(worker_name = worker_name) ⇒ Object
- #worker_name ⇒ Object
Constructor Details
#initialize(couchdb_url) ⇒ Queue
Returns a new instance of Queue.
8 9 10 11 |
# File 'lib/relaxed_job/queue.rb', line 8 def initialize(couchdb_url) @couchdb_url = couchdb_url @options = end |
Instance Attribute Details
#couchdb_url ⇒ Object (readonly)
Returns the value of attribute couchdb_url.
5 6 7 |
# File 'lib/relaxed_job/queue.rb', line 5 def couchdb_url @couchdb_url end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
6 7 8 |
# File 'lib/relaxed_job/queue.rb', line 6 def @options end |
Instance Method Details
#clear_locks! ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/relaxed_job/queue.rb', line 40 def clear_locks! locked_jobs.each do |job| job['state'] = 'pending' job.delete('locked_by') couchdb.save_doc(job) end end |
#completed_jobs ⇒ Object
job types #################################################################
85 86 87 |
# File 'lib/relaxed_job/queue.rb', line 85 def completed_jobs jobs_by_type(:completed) end |
#enqueue(object, *args) ⇒ Object
17 18 19 |
# File 'lib/relaxed_job/queue.rb', line 17 def enqueue(object, *args) enqueue_with_method object, :perform, *args end |
#enqueue_with_method(object, method, *args) ⇒ Object
21 22 23 24 25 26 27 28 29 30 |
# File 'lib/relaxed_job/queue.rb', line 21 def enqueue_with_method(object, method, *args) couchdb.save_doc({ 'class' => 'job', 'state' => 'pending', 'method' => method.to_s, 'arguments' => args.to_a, 'object' => Marshal.dump(object), 'queued_at' => Time.now.utc }) end |
#errored_jobs ⇒ Object
89 90 91 |
# File 'lib/relaxed_job/queue.rb', line 89 def errored_jobs jobs_by_type(:errored) end |
#job(id) ⇒ Object
13 14 15 |
# File 'lib/relaxed_job/queue.rb', line 13 def job(id) couchdb.get(id) end |
#lock(count) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/relaxed_job/queue.rb', line 32 def lock(count) pending_jobs(count).each do |job| job['state'] = 'locked' job['locked_by'] = worker_name couchdb.save_doc(job) end end |
#locked_jobs ⇒ Object
93 94 95 |
# File 'lib/relaxed_job/queue.rb', line 93 def locked_jobs jobs_by_type(:locked, :key => worker_name) end |
#pending_jobs(count) ⇒ Object
97 98 99 |
# File 'lib/relaxed_job/queue.rb', line 97 def pending_jobs(count) jobs_by_type(:pending, :limit => count) end |
#work(worker_name = worker_name) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/relaxed_job/queue.rb', line 48 def work(worker_name=worker_name) lock 3 counts = { :complete => 0, :error => 0 } locked_jobs.each do |job| begin object = Marshal.load(job['object']) retval = object.send(job['method'], *(job['arguments'])) counts[:complete] += 1 job['retval'] = retval job['state'] = 'complete' job['completed_at'] = Time.now.utc couchdb.save_doc(job) rescue StandardError => ex counts[:error] += 1 job['state'] = 'error' job['exception'] = Marshal.dump(ex) job['errored_at'] = Time.now.utc couchdb.save_doc(job) end end counts end |
#worker_name ⇒ Object
77 78 79 80 81 |
# File 'lib/relaxed_job/queue.rb', line 77 def worker_name "host:#{Socket.gethostname} pid:#{$$}" rescue "pid:#{$$}" end |