Module: QueueDispatcher::ActsAsTaskQueue::InstanceMethods
- Defined in:
- lib/queue_dispatcher/acts_as_task_queue.rb
Instance Method Summary collapse
- #acts_as_task_queue_tasks ⇒ Object
-
#all_done? ⇒ Boolean
Are all tasks executed?.
-
#brand_new? ⇒ Boolean
Return true, if the task_queue is in state new and is not older 30 seconds.
-
#destroy_if_all_done! ⇒ Object
Destroy the queue if it has no pending jobs.
-
#empty? ⇒ Boolean
Return true if there are no tasks in this taskqueue.
-
#kill ⇒ Object
Kill a task_queue.
-
#pending? ⇒ Boolean
Return true, if the task_queue has pending jobs and is running but no job is running.
-
#pending_tasks? ⇒ Boolean
Are there any running or pending tasks in the queue?.
-
#pid_running? ⇒ Boolean
Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’.
-
#pop(args = {}) ⇒ Object
Get the next ready to run task out of the queue.
-
#push(task) ⇒ Object
Put a new task into the queue.
-
#reloading_config? ⇒ Boolean
Return true, if the task_queue is in state ‘reloading_config’.
-
#remove_finished_tasks! ⇒ Object
Remove finished tasks from queue.
-
#run!(args = {}) ⇒ Object
Execute all tasks in the queue.
-
#running? ⇒ Boolean
Return true, if the task_queue is still running.
-
#task_states ⇒ Object
Returns the state of this task list (:stopped or :running).
-
#working? ⇒ Boolean
Return true, if the task_queue is working or has pending jobs.
Instance Method Details
#acts_as_task_queue_tasks ⇒ Object
88 89 90 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 88 def acts_as_task_queue_tasks self.send(acts_as_task_queue_config.task_class_name.pluralize) end |
#all_done? ⇒ Boolean
Are all tasks executed?
200 201 202 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 200 def all_done? ! pending_tasks? || empty? end |
#brand_new? ⇒ Boolean
Return true, if the task_queue is in state new and is not older 30 seconds
178 179 180 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 178 def brand_new? state == 'new' && (Time.now - created_at) < 30.seconds end |
#destroy_if_all_done! ⇒ Object
Destroy the queue if it has no pending jobs
231 232 233 234 235 236 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 231 def destroy_if_all_done! transaction do queue = TaskQueue.where(:id => self.id).lock(true).first queue.destroy if queue && queue.all_done? end end |
#empty? ⇒ Boolean
Return true if there are no tasks in this taskqueue
184 185 186 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 184 def empty? acts_as_task_queue_tasks.empty? end |
#kill ⇒ Object
Kill a task_queue
225 226 227 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 225 def kill Process.kill('HUP', pid) if pid end |
#pending? ⇒ Boolean
Return true, if the task_queue has pending jobs and is running but no job is running
212 213 214 215 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 212 def pending? ts = task_states (ts == :new || ts == :pending || ts == :acquire_lock) && self.running? end |
#pending_tasks? ⇒ Boolean
Are there any running or pending tasks in the queue?
190 191 192 193 194 195 196 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 190 def pending_tasks? transaction do queue = TaskQueue.where(:id => self.id).lock(true).first states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true) states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue] end end |
#pid_running? ⇒ Boolean
Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’
160 161 162 163 164 165 166 167 168 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 160 def pid_running? ps = self.pid ? Sys::ProcTable.ps(self.pid) : nil if ps # Asume, that if the command of the 'ps'-output is 'ruby', the process is still running ps.comm == 'ruby' else false end end |
#pop(args = {}) ⇒ Object
Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 101 def pop args = {} task = nil log_debug = acts_as_task_queue_config.debug transaction do # Find next pending task, where all dependent tasks are executed all_tasks = acts_as_task_queue_tasks.lock(true).all i = 0 while task.nil? && i < all_tasks.count do t = all_tasks[i] if t.dependent_tasks_executed? task = t if t.state == 'new' else log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...", :sev => :debug if log_debug end i += 1 end # Remove task from current queue if task if args[:remove_task].nil? || args[:remove_task] task.update_attribute :task_queue_id, nil else task.update_attribute :state, 'new_popped' end end end task end |
#push(task) ⇒ Object
Put a new task into the queue
94 95 96 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 94 def push task acts_as_task_queue_tasks << task end |
#reloading_config? ⇒ Boolean
Return true, if the task_queue is in state ‘reloading_config’
219 220 221 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 219 def reloading_config? pid_running? && state == 'reloading_config' end |
#remove_finished_tasks! ⇒ Object
Remove finished tasks from queue
240 241 242 243 244 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 240 def remove_finished_tasks! trasnaction do tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? } end end |
#run!(args = {}) ⇒ Object
Execute all tasks in the queue
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 248 def run! args = {} task = nil @logger = args[:logger] || Logger.new("#{File.(Rails.root)}/log/task_queue.log") finish_state = 'aborted' task_queue = self print_log = args[:print_log] task_queue.update_attribute :state, 'running' # Set logger in engine @engine.logger = @logger if defined? @engine log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log # Init. Pop first task from queue, to show init_queue-state task = task_queue.pop(:remove_task => false) task.update_attribute :state, 'init_queue' if task init # Put task, which was used for showing the init_queue-state, back into the task_queue task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task task_queue.reload # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this # can happen, if there are a lot of DB activities...) first_run = true # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do first_run = false # Pop next task from queue task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue)) if task if task.new? # Start task.update_attributes :state => 'acquire_lock', :perc_finished => 0 get_lock_for task log :msg => "#{name}: Starting task #{task.id} (#{task.target.class.name}.#{task.method_name})...", :print_log => print_log task.update_attributes :state => 'running' # Execute the method defined in task.method if task.target.methods.include?(task.method_name) || task.target.methods.include?(task.method_name.to_sym) if task.dependent_tasks_had_errors error_msg = "Dependent tasks had errors!" log :msg => error_msg, :sev => :warn, :print_log => print_log rc_and_msg = QueueDispatcher::RcAndMsg.bad_rc error_msg else target = task.target target.logger = @logger if target.methods.include?(:logger=) || target.methods.include?('logger=') rc_and_msg = task.execute! end else error_msg = "unknown method '#{task.method_name}' for #{task.target.class.name}!" log :msg => error_msg, :sev => :warn, :print_log => print_log rc_and_msg = QueueDispatcher::RcAndMsg.bad_rc error_msg end # Change task state according to the return code and remove it from the queue task.update_state rc_and_msg cleanup_locks_after_error_for task task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue log :msg => "#{name}: Task #{task.id} (#{task.target.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log end else # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks. # Sleep some time before trying it again. sleep acts_as_task_queue_config.poll_time end # Reload task_queue to get all updates task_queue = TaskQueue.find_by_id task_queue.id # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads, # wait some time before continuing. Maybe, some more tasks will added to the queue?! wait_time = 0 unless task_queue.nil? || task_queue.terminate_immediately until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do sleep acts_as_task_queue_config.poll_time wait_time += acts_as_task_queue_config.poll_time task_queue = TaskQueue.find_by_id task_queue.id end end # Reset logger since this got lost by reloading the task_queue task_queue.logger = @logger if task_queue end # Reload config if last task was not a config reload config_reload_required = cleanup_before_auto_reload if config_reload_required task_queue.update_attributes :state => 'reloading_config' if task_queue reload_config task, print_log: print_log end # Loop has ended log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log finish_state = 'stopped' rescue => exception # Error handler backtrace = exception.backtrace.join("\n ") log :msg => "Fatal error in method 'run!': #{$!}\n #{backtrace}", :sev => :error, :print_log => print_log puts "Fatal error in method 'run!': #{$!}\n#{backtrace}" task.update_state QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}") if task cleanup_locks_after_error_for task if task task.update_attributes state: 'error' if task && task.state != 'finished' ensure # Reload task and task_queue, to ensure the objects are up to date task_queue = TaskQueue.find_by_id task_queue.id if task_queue task = Task.find_by_id task.id if task # Delete task_queue task_queue.destroy_if_all_done! if task_queue # Update states of task and task_queue task.update_attributes :state => 'aborted' if task && task.state == 'running' task_queue.update_attributes :state => finish_state, :pid => nil if task_queue # Clean up deinit end |
#running? ⇒ Boolean
Return true, if the task_queue is still running
172 173 174 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 172 def running? state == 'running' && pid_running? end |
#task_states ⇒ Object
Returns the state of this task list (:stopped or :running)
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 134 def task_states states = determine_state_of_task_array acts_as_task_queue_tasks if states[:empty] nil elsif states[:running] :running elsif states[:init_queue] :init_queue elsif states[:pending] :pending elsif states[:acquire_lock] :acquire_lock elsif states[:error] :error elsif states[:new] :new elsif states[:successful] :successful else :unknown end end |
#working? ⇒ Boolean
Return true, if the task_queue is working or has pending jobs
206 207 208 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 206 def working? self.task_states == :running && self.running? end |