Module: QueueProcessor::RootCalculation::QueueControl
Defined Under Namespace
Classes: CalculationRequest
Instance Method Summary
collapse
Instance Method Details
#add_to_queue ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 17
def add_to_queue
old_queue_time = self.queued_at
queue_time = Time.now.utc
if (old_queue_time.to_f.round(2) == queue_time.to_f.round(2))
Rails.logger.warn("#{self.describe}: queued again too quickly")
sleep(0.001)
add_to_queue
return
end
Rails.logger.info("#{self.describe}: added to the queue at #{queue_time} (#{queue_time.to_f})")
before_enqueue
touch
update_column(:queued_at, queue_time)
do_expire_calculation
self.queue_work(queue_time)
end
|
#before_enqueue ⇒ Object
8
9
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 8
def before_enqueue
end
|
#can_start? ⇒ Boolean
called when the delayed job is deserailized and ready to run. The calculation can be performed if the time the delayed job was queued matches the time the calculation was queued. We round the times to two places as there seem to be float / serialization differences between postgres / YAML / Time / TimeStamp objects. We didn’t quite identify the problem, but this fixes error - jobs not running when they should.
65
66
67
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 65
def can_start?
self.queued_at.present? && self.queued_at.to_f.round(2) == self.delayed_job_queued_at.to_f.round(2)
end
|
#delayed_job_options ⇒ Object
78
79
80
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 78
def delayed_job_options
{:priority=>priority, :run_at => run_at, :queue => queue}.reject {|k,v| v.nil?}
end
|
#delayed_jobs ⇒ Object
13
14
15
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 13
def delayed_jobs
Delayed::Backend::ActiveRecord::Job.where("handler ilike '%RootCalculation::QueueControl::CalculationRequest%' and handler like '%id: #{id}%'")
end
|
#dependencies_in_queue? ⇒ Boolean
97
98
99
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 97
def dependencies_in_queue?
dependent_calculation_groups.inject(false) { |result, dependent_calculation_group| result|| dependent_calculation_group.something_queued? }
end
|
#describe ⇒ Object
69
70
71
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 69
def describe
"Calculate #{self.class.to_s}: #{self.id}"
end
|
#do_expire_calculation ⇒ Object
expire our GMI - make it stale
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 48
def do_expire_calculation
begin
self.with_lock do
fire_events!(:expire_calculation)
end
rescue StateMachine::Error => e
Rails.logger.warn("#{self.describe}: invalid state transition expiring calculation: #{self.attributes}, changed: #{self.changed_attributes.map {|k,v| {k => {v => self.send(k) }}}}")
raise e
end
end
|
#in_queue? ⇒ Boolean
93
94
95
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 93
def in_queue?
delayed_jobs.count > 0
end
|
#queue_work(queued_at) ⇒ Object
73
74
75
76
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 73
def queue_work(queued_at)
Rails.logger.debug {"#{self.describe}: queued at #{queued_at.to_f}"}
Delayed::Job.enqueue(CalculationRequest.new(self.id, queued_at, self.describe, self.class), delayed_job_options)
end
|
#requeue_work(queued_at) ⇒ Object
83
84
85
86
87
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 83
def requeue_work(queued_at)
Rails.logger.debug {"#{self.describe}: re-queued at #{queued_at.to_f}"}
do_expire_calculation queue_work(queued_at)
end
|
#something_queued? ⇒ Boolean
89
90
91
|
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 89
def something_queued?
in_queue? || dependencies_in_queue?
end
|