Class: QueueProcessor::RootCalculation::QueueControl::CalculationRequest
- Inherits:
-
Struct
- Object
- Struct
- QueueProcessor::RootCalculation::QueueControl::CalculationRequest
- Defined in:
- lib/queue-processor/root_calculation/queue_control.rb
Instance Attribute Summary collapse
-
#clazz ⇒ Object
Returns the value of attribute clazz.
-
#description ⇒ Object
Returns the value of attribute description.
-
#id ⇒ Object
Returns the value of attribute id.
-
#queued_at ⇒ Object
Returns the value of attribute queued_at.
Instance Method Summary collapse
Instance Attribute Details
#clazz ⇒ Object
Returns the value of attribute clazz
101 102 103 |
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101 def clazz @clazz end |
#description ⇒ Object
Returns the value of attribute description
101 102 103 |
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101 def description @description end |
#id ⇒ Object
Returns the value of attribute id
101 102 103 |
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101 def id @id end |
#queued_at ⇒ Object
Returns the value of attribute queued_at
101 102 103 |
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 101 def queued_at @queued_at end |
Instance Method Details
#display_name ⇒ Object
138 139 140 |
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 138 def display_name "calculate #{description} (id=#{id}, queued_at=#{queued_at})" end |
#perform ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/queue-processor/root_calculation/queue_control.rb', line 102 def perform obj = clazz.where("id=? and date_trunc('milliseconds',queued_at)=date_trunc('milliseconds',?::timestamp)",id, queued_at).first if (obj.present?) Rails.logger.debug {"#{obj.describe}: dequeued by delayed job"} obj.delayed_job_queued_at = queued_at if obj.fire_events(:dequeue_processing) Rails.logger.info("#{obj.describe}: starting processing.") result = obj.fire_events(:start_processing) if (result) Rails.logger.info("#{obj.describe}: finished calculation and done in delayed job.") end else # somebody else is running, we have to wait for them to finish if (obj.calculating?) if obj.recover_after.present? && (Time.now - queued_at) >= obj.recover_after Rails.logger.error("#{obj.describe}: has been stuck processing for too long (#{obj.recover_after} seconds) - reseting state machines and re-queueing") obj.reset_state_machines! obj.add_to_queue else obj.requeue_work(queued_at) end end end else obj = clazz.where(:id => id).first if obj.nil? Rails.logger.warn("#{clazz}: #{id}: Skipping: #{id} - Does not exist") else Rails.logger.warn("#{obj.describe}: Skipped - Queued at: #{obj.queued_at} doesn't match delayed job queued at: #{queued_at}") end end end |