Class: RocketJob::Supervisor
- Inherits:
-
Object
- Object
- RocketJob::Supervisor
- Includes:
- Shutdown, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/supervisor.rb,
lib/rocket_job/supervisor/shutdown.rb
Overview
Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown.
Defined Under Namespace
Modules: Shutdown
Instance Attribute Summary collapse
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#worker_id ⇒ Object
Returns the value of attribute worker_id.
-
#worker_pool ⇒ Object
readonly
Returns the value of attribute worker_pool.
Class Method Summary collapse
-
.run ⇒ Object
Start the Supervisor, using the supplied attributes to create a new Server instance.
Instance Method Summary collapse
-
#initialize(server) ⇒ Supervisor
constructor
A new instance of Supervisor.
- #run ⇒ Object
- #stop! ⇒ Object
- #supervise_pool ⇒ Object
- #synchronize(&block) ⇒ Object
Constructor Details
#initialize(server) ⇒ Supervisor
Returns a new instance of Supervisor.
24 25 26 27 28 |
# File 'lib/rocket_job/supervisor.rb', line 24 def initialize(server) @server = server @worker_pool = WorkerPool.new(server.name) @mutex = Mutex.new end |
Instance Attribute Details
#server ⇒ Object (readonly)
Returns the value of attribute server.
9 10 11 |
# File 'lib/rocket_job/supervisor.rb', line 9 def server @server end |
#worker_id ⇒ Object
Returns the value of attribute worker_id.
10 11 12 |
# File 'lib/rocket_job/supervisor.rb', line 10 def worker_id @worker_id end |
#worker_pool ⇒ Object (readonly)
Returns the value of attribute worker_pool.
9 10 11 |
# File 'lib/rocket_job/supervisor.rb', line 9 def worker_pool @worker_pool end |
Class Method Details
.run ⇒ Object
Start the Supervisor, using the supplied attributes to create a new Server instance.
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/rocket_job/supervisor.rb', line 13 def self.run Thread.current.name = "rocketjob main" RocketJob.create_indexes register_signal_handlers server = Server.create! new(server).run ensure server&.destroy end |
Instance Method Details
#run ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rocket_job/supervisor.rb', line 30 def run logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" logger.info("Running with filter", Config.filter) if Config.filter server.started! logger.info "Rocket Job Server started" event_listener = Thread.new { Event.listener } Subscribers::SecretConfig.subscribe if defined?(SecretConfig) Subscribers::Server.subscribe(self) do Subscribers::Worker.subscribe(self) do Subscribers::Logger.subscribe do supervise_pool stop! end end end rescue ::Mongoid::Errors::DocumentNotFound logger.info("Server has been destroyed. Going down hard!") rescue Exception => e logger.error("RocketJob::Server is stopping due to an exception", e) ensure event_listener&.kill # Logs the backtrace for each running worker worker_pool.log_backtraces logger.info("Shutdown Complete") end |
#stop! ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/rocket_job/supervisor.rb', line 57 def stop! server.stop! if server.may_stop? synchronize do worker_pool.stop end until worker_pool.join logger.info "Waiting for workers to finish processing ..." # One or more workers still running so update heartbeat so that server reports "alive". server.refresh(worker_pool.living_count) end end |
#supervise_pool ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/rocket_job/supervisor.rb', line 69 def supervise_pool stagger = true until self.class.shutdown? synchronize do if server.running? worker_pool.prune worker_pool.rebalance(server.max_workers, stagger) stagger = false elsif server.paused? worker_pool.stop sleep(0.1) worker_pool.prune stagger = true else break end end synchronize { server.refresh(worker_pool.living_count) } self.class.wait_for_event(Config.heartbeat_seconds) break if self.class.shutdown? end end |
#synchronize(&block) ⇒ Object
95 96 97 |
# File 'lib/rocket_job/supervisor.rb', line 95 def synchronize(&block) @mutex.synchronize(&block) end |