Module: QueueDispatcher::ActsAsTaskQueue::InstanceMethods
- Defined in:
- lib/queue_dispatcher/acts_as_task_queue.rb
Instance Method Summary collapse
- #acts_as_task_queue_tasks ⇒ Object
-
#all_done? ⇒ Boolean
Are all tasks executed?.
-
#brand_new? ⇒ Boolean
Return true, if the task_queue is in state new and is not older 30 seconds.
-
#destroy_if_all_done! ⇒ Object
Destroy the queue if it has no pending jobs.
-
#empty? ⇒ Boolean
Return true if there are no tasks in this taskqueue.
-
#kill ⇒ Object
Kill a task_queue.
-
#pending? ⇒ Boolean
Return true, if the task_queue has pending jobs and is running but no job is running.
-
#pending_tasks? ⇒ Boolean
Are there any running or pending tasks in the queue?.
-
#pid_running? ⇒ Boolean
Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’.
-
#pop(args = {}) ⇒ Object
Get the next ready to run task out of the queue.
-
#push(task) ⇒ Object
Put a new task into the queue.
-
#reloading_config? ⇒ Boolean
Return true, if the task_queue is in state ‘reloading_config’.
-
#remove_finished_tasks! ⇒ Object
Remove finished tasks from queue.
-
#run!(args = {}) ⇒ Object
Execute all tasks in the queue.
-
#running? ⇒ Boolean
Return true, if the task_queue is still running.
-
#task_states ⇒ Object
Returns the state of this task list (:stopped or :running).
-
#working? ⇒ Boolean
Return true, if the task_queue is working or has pending jobs.
Instance Method Details
#acts_as_task_queue_tasks ⇒ Object
109 110 111 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 109 def acts_as_task_queue_tasks self.send(acts_as_task_queue_config.task_class_name.pluralize) end |
#all_done? ⇒ Boolean
Are all tasks executed?
223 224 225 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 223 def all_done? ! pending_tasks? || empty? end |
#brand_new? ⇒ Boolean
Return true, if the task_queue is in state new and is not older 30 seconds
201 202 203 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 201 def brand_new? state == 'new' && (Time.now - created_at) < 30.seconds end |
#destroy_if_all_done! ⇒ Object
Destroy the queue if it has no pending jobs
254 255 256 257 258 259 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 254 def destroy_if_all_done! transaction do queue = TaskQueue.where(:id => self.id).lock(true).first queue.destroy if queue && queue.all_done? end end |
#empty? ⇒ Boolean
Return true if there are no tasks in this taskqueue
207 208 209 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 207 def empty? acts_as_task_queue_tasks.empty? end |
#kill ⇒ Object
Kill a task_queue
248 249 250 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 248 def kill Process.kill('HUP', pid) if pid end |
#pending? ⇒ Boolean
Return true, if the task_queue has pending jobs and is running but no job is running
235 236 237 238 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 235 def pending? ts = task_states (ts == :new || ts == :pending || ts == :acquire_lock) && self.running? end |
#pending_tasks? ⇒ Boolean
Are there any running or pending tasks in the queue?
213 214 215 216 217 218 219 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 213 def pending_tasks? transaction do queue = TaskQueue.where(:id => self.id).lock(true).first states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true) states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue] end end |
#pid_running? ⇒ Boolean
Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’
183 184 185 186 187 188 189 190 191 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 183 def pid_running? ps = self.pid ? Sys::ProcTable.ps(self.pid) : nil if ps # Asume, that if the command of the 'ps'-output is 'ruby', the process is still running ps.comm == 'ruby' else false end end |
#pop(args = {}) ⇒ Object
Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 122 def pop(args = {}) task = nil log_debug = acts_as_task_queue_config.debug transaction do # Find next pending task, where all dependent tasks are executed all_tasks = acts_as_task_queue_tasks.lock(true).all pos = 0 while task.nil? && pos < all_tasks.count do t = all_tasks[pos] if t.dependent_tasks_executed? task = t if t.state == 'new' else log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...", :sev => :debug if log_debug end pos += 1 end # Remove task from current queue if task if args[:remove_task].nil? || args[:remove_task] task.update_attribute :task_queue_id, nil else task.update_attribute :state, 'new_popped' end end end task end |
#push(task) ⇒ Object
Put a new task into the queue
115 116 117 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 115 def push(task) acts_as_task_queue_tasks << task end |
#reloading_config? ⇒ Boolean
Return true, if the task_queue is in state ‘reloading_config’
242 243 244 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 242 def reloading_config? pid_running? && state == 'reloading_config' end |
#remove_finished_tasks! ⇒ Object
Remove finished tasks from queue
263 264 265 266 267 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 263 def remove_finished_tasks! trasnaction do tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? } end end |
#run!(args = {}) ⇒ Object
Execute all tasks in the queue
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 271 def run!(args = {}) task = nil @logger = args[:logger] || Logger.new("#{File.(Rails.root)}/log/task_queue.log") finish_state = 'aborted' task_queue = self print_log = args[:print_log] task_queue.update_attribute :state, 'running' # Set logger in engine @engine.logger = @logger if defined? @engine && @engine.methods.include?(:logger=) log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log # Init. Pop first task from queue, to show init_queue-state task = task_queue.pop(:remove_task => false) task.update_attribute :state, 'init_queue' if task init # Put task, which was used for showing the init_queue-state, back into the task_queue task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task task_queue.reload # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this # can happen, if there are a lot of DB activities...) first_run = true # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do first_run = false # Pop next task from queue task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue)) if task if task.new? # Start task.update_attributes :state => 'acquire_lock', :perc_finished => 0 get_lock_for task log :msg => "#{name}: Starting task #{task.id} (#{task.target.class.name}.#{task.method_name})...", :print_log => print_log task.update_attributes :state => 'running' # Execute the method defined in task.method if task.target.methods.include?(task.method_name) || task.target.methods.include?(task.method_name.to_sym) if task.dependent_tasks_had_errors error_msg = 'Dependent tasks had errors!' log :msg => error_msg, :sev => :warn, :print_log => print_log result = QueueDispatcher::RcAndMsg.bad_rc error_msg else target = task.target target.logger = @logger if target.methods.include?(:logger=) || target.methods.include?('logger=') result = task.execute! end else error_msg = "unknown method '#{task.method_name}' for #{task.target.class.name}!" log :msg => error_msg, :sev => :warn, :print_log => print_log result = QueueDispatcher::RcAndMsg.bad_rc error_msg end # Change task state according to the return code and remove it from the queue task.update_state result cleanup_locks_after_error_for task task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue log :msg => "#{name}: Task #{task.id} (#{task.target.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log # Wait between tasks sleep acts_as_task_queue_config.task_finish_wait_time end else # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks. # Sleep some time before trying it again. sleep acts_as_task_queue_config.poll_time end # Interrupts handle_interrupts print_log: print_log # Reload task_queue to get all updates task_queue = TaskQueue.find_by_id task_queue.id # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads, # wait some time before continuing. Maybe, some more tasks will added to the queue?! wait_time = 0 unless task_queue.nil? || task_queue.terminate_immediately until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do sleep acts_as_task_queue_config.poll_time wait_time += acts_as_task_queue_config.poll_time task_queue = TaskQueue.find_by_id task_queue.id end end # Reset logger since this got lost by reloading the task_queue task_queue.logger = @logger if task_queue end # Reload config if last task was not a config reload config_reload_required = cleanup_before_auto_reload if config_reload_required task_queue.update_attributes :state => 'reloading_config' if task_queue reload_config task, print_log: print_log end # Loop has ended log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log finish_state = 'stopped' rescue => exception # Error handler backtrace = exception.backtrace.join("\n ") log :msg => "Fatal error in method 'run!': #{$!}\n #{backtrace}", :sev => :error, :print_log => print_log puts "Fatal error in method 'run!': #{$!}\n#{backtrace}" task.update_state QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}") if task cleanup_locks_after_error_for task if task task.update_attributes state: 'error' if task && task.state != 'finished' ensure # Reload task and task_queue, to ensure the objects are up to date task_queue = TaskQueue.find_by_id task_queue.id if task_queue task = Task.find_by_id task.id if task # Delete task_queue task_queue.destroy_if_all_done! if task_queue # Update states of task and task_queue task.update_attributes :state => 'aborted' if task && task.state == 'running' task_queue.update_attributes :state => finish_state, :pid => nil if task_queue # Clean up deinit end |
#running? ⇒ Boolean
Return true, if the task_queue is still running
195 196 197 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 195 def running? state == 'running' && pid_running? end |
#task_states ⇒ Object
Returns the state of this task list (:stopped or :running)
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 155 def task_states states = determine_state_of_task_array acts_as_task_queue_tasks if states[:empty] nil elsif states[:running] :running elsif states[:init_queue] :init_queue elsif states[:pending] :pending elsif states[:acquire_lock] :acquire_lock elsif states[:error] :error elsif states[:aborted] :aborted elsif states[:new] :new elsif states[:successful] :successful else :unknown end end |
#working? ⇒ Boolean
Return true, if the task_queue is working or has pending jobs
229 230 231 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 229 def working? self.task_states == :running && self.running? end |