Class: BdrbJobQueue
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- BdrbJobQueue
- Defined in:
- lib/backgroundrb/bdrb_job_queue.rb
Overview
Model for storing jobs/tasks persisted to the database
Class Method Summary collapse
-
.find_next(worker_name, worker_key = nil) ⇒ Object
find next task from the table.
-
.insert_job(options = { }) ⇒ Object
insert a new job for processing.
-
.remove_job(options = { }) ⇒ Object
remove a job from table.
Instance Method Summary collapse
- #args ⇒ Object
-
#args=(args) ⇒ Object
these accessors get around any possible character encoding issues with the database.
-
#finish! ⇒ Object
Mark a job as finished.
-
#release_job ⇒ Object
release a job and mark it to be unfinished and free.
Class Method Details
.find_next(worker_name, worker_key = nil) ⇒ Object
find next task from the table
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 6 def self.find_next(worker_name,worker_key = nil) returned_job = nil ActiveRecord::Base.verify_active_connections! transaction do unless worker_key #use ruby time stamps for time calculations as db might have different times than what is calculated by ruby/rails t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND scheduled_at <= ? ", worker_name, 0, Time.now.utc ],:lock => true, :order => 'priority desc') else t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND worker_key = ? AND scheduled_at <= ? ", worker_name, 0, worker_key, Time.now.utc ],:lock => true) end if t_job t_job.taken = 1 t_job.started_at = Time.now.utc t_job.save returned_job = t_job end end returned_job end |
.insert_job(options = { }) ⇒ Object
insert a new job for processing. jobs added will be automatically picked by the appropriate worker
47 48 49 50 51 52 53 54 |
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 47 def self.insert_job( = { }) ActiveRecord::Base.verify_active_connections! transaction do .merge!(:submitted_at => Time.now.utc,:finished => 0,:taken => 0) t_job = new() t_job.save end end |
.remove_job(options = { }) ⇒ Object
remove a job from table
57 58 59 60 61 62 63 |
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 57 def self.remove_job( = { }) ActiveRecord::Base.verify_active_connections! transaction do t_job_id = find(:first, :conditions => .merge(:finished => 0,:taken => 0),:lock => true) delete(t_job_id) end end |
Instance Method Details
#args ⇒ Object
31 32 33 |
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 31 def args Base64.decode64(read_attribute(:args)) end |
#args=(args) ⇒ Object
these accessors get around any possible character encoding issues with the database
27 28 29 |
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 27 def args=(args) write_attribute(:args, Base64.encode64(args)) end |
#finish! ⇒ Object
Mark a job as finished
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 66 def finish! ActiveRecord::Base.verify_active_connections! self.class.transaction do self.finished = 1 self.finished_at = Time.now.utc self.job_key = "finished_#{Time.now.utc.to_i}_#{job_key}" self.save end Thread.current[:persistent_job_id] = nil Thread.current[:job_key] = nil nil end |
#release_job ⇒ Object
release a job and mark it to be unfinished and free. useful, if inside a worker, processing of this job failed and you want it to process later
37 38 39 40 41 42 43 44 |
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 37 def release_job ActiveRecord::Base.verify_active_connections! self.class.transaction do self.taken = 0 self.started_at = nil self.save end end |