Class: BdrbJobQueue

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/backgroundrb/bdrb_job_queue.rb

Overview

Model for storing jobs/tasks persisted to the database

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.find_next(worker_name, worker_key = nil) ⇒ Object

find next task from the table



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 6

def self.find_next(worker_name,worker_key = nil)
  returned_job = nil
  ActiveRecord::Base.verify_active_connections!
  transaction do
    unless worker_key
      #use ruby time stamps for time calculations as db might have different times than what is calculated by ruby/rails
      t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND scheduled_at <= ? ", worker_name, 0, Time.now.utc ],:lock => true, :order => 'priority desc')
    else
      t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND worker_key = ? AND scheduled_at <= ? ", worker_name, 0, worker_key, Time.now.utc ],:lock => true)
    end
    if t_job
      t_job.taken = 1
      t_job.started_at = Time.now.utc
      t_job.save
      returned_job = t_job
    end
  end
  returned_job
end

.insert_job(options = { }) ⇒ Object

insert a new job for processing. jobs added will be automatically picked by the appropriate worker



47
48
49
50
51
52
53
54
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 47

def self.insert_job(options = { })
  ActiveRecord::Base.verify_active_connections!
  transaction do
    options.merge!(:submitted_at => Time.now.utc,:finished => 0,:taken => 0)
    t_job = new(options)
    t_job.save
  end
end

.remove_job(options = { }) ⇒ Object

remove a job from table



57
58
59
60
61
62
63
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 57

def self.remove_job(options = { })
  ActiveRecord::Base.verify_active_connections!
  transaction do
    t_job_id = find(:first, :conditions => options.merge(:finished => 0,:taken => 0),:lock => true)
    delete(t_job_id)
  end
end

Instance Method Details

#argsObject



31
32
33
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 31

def args
  Base64.decode64(read_attribute(:args))
end

#args=(args) ⇒ Object

these accessors get around any possible character encoding issues with the database



27
28
29
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 27

def args=(args)
  write_attribute(:args, Base64.encode64(args))
end

#finish!Object

Mark a job as finished



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 66

def finish!
  ActiveRecord::Base.verify_active_connections!
  self.class.transaction do
    self.finished = 1
    self.finished_at = Time.now.utc
    self.job_key = "finished_#{Time.now.utc.to_i}_#{job_key}"
    self.save
  end
  Thread.current[:persistent_job_id] = nil
  Thread.current[:job_key] = nil
  nil
end

#release_jobObject

release a job and mark it to be unfinished and free. useful, if inside a worker, processing of this job failed and you want it to process later



37
38
39
40
41
42
43
44
# File 'lib/backgroundrb/bdrb_job_queue.rb', line 37

def release_job
  ActiveRecord::Base.verify_active_connections!
  self.class.transaction do
    self.taken = 0
    self.started_at = nil
    self.save
  end
end