Class: Rails::Queue::ThreadedQueueConsumer
- Inherits:
-
Object
- Object
- Rails::Queue::ThreadedQueueConsumer
- Defined in:
- lib/rails/queue/queue.rb
Overview
The threaded consumer will run jobs in a background thread in development mode or in a VM where running jobs on a thread in production mode makes sense.
When the process exits, the consumer pushes a nil onto the queue and joins the thread, which will ensure that all jobs are executed before the process finally dies.
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #consume ⇒ Object
- #drain ⇒ Object
- #handle_exception(job, exception) ⇒ Object
-
#initialize(queue, options = {}) ⇒ ThreadedQueueConsumer
constructor
A new instance of ThreadedQueueConsumer.
- #run(job) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(queue, options = {}) ⇒ ThreadedQueueConsumer
Returns a new instance of ThreadedQueueConsumer.
72 73 74 75 76 |
# File 'lib/rails/queue/queue.rb', line 72 def initialize(queue, = {}) @queue = queue @logger = [:logger] @fallback_logger = Logger.new($stderr) end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
70 71 72 |
# File 'lib/rails/queue/queue.rb', line 70 def logger @logger end |
Instance Method Details
#consume ⇒ Object
92 93 94 95 96 |
# File 'lib/rails/queue/queue.rb', line 92 def consume while job = @queue.pop run job end end |
#drain ⇒ Object
88 89 90 |
# File 'lib/rails/queue/queue.rb', line 88 def drain @queue.pop.run until @queue.empty? end |
#handle_exception(job, exception) ⇒ Object
104 105 106 |
# File 'lib/rails/queue/queue.rb', line 104 def handle_exception(job, exception) (logger || @fallback_logger).error "Job Error: #{job.inspect}\n#{exception.}\n#{exception.backtrace.join("\n")}" end |
#run(job) ⇒ Object
98 99 100 101 102 |
# File 'lib/rails/queue/queue.rb', line 98 def run(job) job.run rescue Exception => exception handle_exception job, exception end |
#shutdown ⇒ Object
83 84 85 86 |
# File 'lib/rails/queue/queue.rb', line 83 def shutdown @queue.push nil @thread.join end |
#start ⇒ Object
78 79 80 81 |
# File 'lib/rails/queue/queue.rb', line 78 def start @thread = Thread.new { consume } self end |