Class: Sidekiq::Processor
- Inherits:
-
Object
- Object
- Sidekiq::Processor
- Includes:
- Celluloid, Util
- Defined in:
- lib/sidekiq/processor.rb
Overview
The Processor receives a message from the Manager and actually processes it. It instantiates the worker, runs the middleware chain and then calls Sidekiq::Worker#perform.
Constant Summary
Constants included from Util
Instance Attribute Summary collapse
-
#actual_work_thread ⇒ Object
store the actual working thread so we can later kill if it necessary during hard shutdown.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(boss) ⇒ Processor
constructor
A new instance of Processor.
- #inspect ⇒ Object
- #process(work) ⇒ Object
Methods included from Util
#hostname, #logger, #process_id, #redis, #watchdog
Methods included from ExceptionHandler
Constructor Details
#initialize(boss) ⇒ Processor
Returns a new instance of Processor.
30 31 32 |
# File 'lib/sidekiq/processor.rb', line 30 def initialize(boss) @boss = boss end |
Instance Attribute Details
#actual_work_thread ⇒ Object
store the actual working thread so we can later kill if it necessary during hard shutdown.
28 29 30 |
# File 'lib/sidekiq/processor.rb', line 28 def actual_work_thread @actual_work_thread end |
Class Method Details
.default_middleware ⇒ Object
17 18 19 20 21 22 23 |
# File 'lib/sidekiq/processor.rb', line 17 def self.default_middleware Middleware::Chain.new do |m| m.add Middleware::Server::Logging m.add Middleware::Server::RetryJobs m.add Middleware::Server::ActiveRecord end end |
Instance Method Details
#inspect ⇒ Object
64 65 66 |
# File 'lib/sidekiq/processor.rb', line 64 def inspect "#<Processor #{to_s}>" end |
#process(work) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/sidekiq/processor.rb', line 34 def process(work) msgstr = work. queue = work.queue_name defer do @actual_work_thread = Thread.current begin msg = Sidekiq.load_json(msgstr) klass = msg['class'].constantize worker = klass.new worker.jid = msg['jid'] stats(worker, msg, queue) do Sidekiq.server_middleware.invoke(worker, msg, queue) do worker.perform(*cloned(msg['args'])) end end rescue Sidekiq::Shutdown # Had to force kill this job because it didn't finish # within the timeout. rescue Exception => ex handle_exception(ex, msg || { :message => msgstr }) raise ensure work.acknowledge end end @boss.async.processor_done(current_actor) end |