Class: Delayed::Backend::DataMapper::Job

Inherits:
Object
  • Object
show all
Includes:
DataMapper::Resource, Base
Defined in:
lib/delayed/backend/data_mapper.rb

Constant Summary

Constants included from Base

Base::ParseObjectFromYaml

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#failed?, included, #invoke_job, #max_attempts, #name, #payload_object, #payload_object=, #reschedule_at, #unlock

Class Method Details

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.

[View source]

65
66
67
# File 'lib/delayed/backend/data_mapper.rb', line 65

def self.clear_locks!(worker_name)
  all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil)
end

.db_time_nowObject

[View source]

40
41
42
# File 'lib/delayed/backend/data_mapper.rb', line 40

def self.db_time_now
  Time.now
end

.delete_allObject

these are common to the other backends, so we provide an implementation

[View source]

96
97
98
# File 'lib/delayed/backend/data_mapper.rb', line 96

def self.delete_all
  Delayed::Job.auto_migrate!
end

.find(id) ⇒ Object

[View source]

100
101
102
# File 'lib/delayed/backend/data_mapper.rb', line 100

def self.find id
  get id
end

.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object

[View source]

44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/delayed/backend/data_mapper.rb', line 44

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  
  simple_conditions = { :run_at.lte => db_time_now, :limit => limit, :failed_at => nil, :order => [:priority.asc, :run_at.asc]  }

  # respect priorities
  simple_conditions[:priority.gte] = Worker.min_priority if Worker.min_priority
  simple_conditions[:priority.lte] = Worker.max_priority if Worker.max_priority
  
  # lockable 
  lockable = (
    # not locked or past the max time
    ( all(:locked_at => nil ) | all(:locked_at.lt => db_time_now - max_run_time)) |

    # OR locked by our worker
    all(:locked_by => worker_name))
    
  # plus some other boring junk 
  (lockable).all( simple_conditions )
end

Instance Method Details

#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.

[View source]

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/delayed/backend/data_mapper.rb', line 71

def lock_exclusively!(max_run_time, worker = worker_name)

  now = self.class.db_time_now
  overtime = now - max_run_time
  
  # FIXME - this is a bit gross
  # DM doesn't give us the number of rows affected by a collection update
  # so we have to circumvent some niceness in DM::Collection here
  collection = locked_by != worker ?
    (self.class.all(:id => id, :run_at.lte => now) & ( self.class.all(:locked_at => nil) | self.class.all(:locked_at.lt => overtime) ) ) :
    self.class.all(:id => id, :locked_by => worker)
  
  attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes
  affected_rows = self.repository.update(attributes, collection)
    
  if affected_rows == 1
    self.locked_at = now
    self.locked_by = worker
    return true
  else
    return false
  end
end

#update_attributes(attributes) ⇒ Object

[View source]

104
105
106
107
108
109
# File 'lib/delayed/backend/data_mapper.rb', line 104

def update_attributes(attributes)
  attributes.each do |k,v|
    self[k] = v
  end
  self.save
end