Class: RFlow::Master
- Inherits:
-
DaemonProcess
- Object
- DaemonProcess
- RFlow::Master
- Defined in:
- lib/rflow/master.rb
Instance Attribute Summary collapse
-
#brokers ⇒ Object
readonly
Returns the value of attribute brokers.
-
#shards ⇒ Object
readonly
Returns the value of attribute shards.
Instance Method Summary collapse
-
#initialize(config) ⇒ Master
constructor
A new instance of Master.
- #run! ⇒ Object
- #run_process ⇒ Object
- #shutdown!(reason) ⇒ Object
- #spawn_subprocesses ⇒ Object
- #subprocesses ⇒ Object
Methods inherited from DaemonProcess
Constructor Details
#initialize(config) ⇒ Master
Returns a new instance of Master.
11 12 13 14 15 16 17 |
# File 'lib/rflow/master.rb', line 11 def initialize(config) super(config['rflow.application_name'], 'Master') @pid_file = PIDFile.new(config['rflow.pid_file_path']) @shards = config.shards.map {|config| Shard.new(config) } RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) } end |
Instance Attribute Details
#brokers ⇒ Object (readonly)
Returns the value of attribute brokers.
9 10 11 |
# File 'lib/rflow/master.rb', line 9 def brokers @brokers end |
#shards ⇒ Object (readonly)
Returns the value of attribute shards.
8 9 10 |
# File 'lib/rflow/master.rb', line 8 def shards @shards end |
Instance Method Details
#run! ⇒ Object
19 20 21 22 23 24 |
# File 'lib/rflow/master.rb', line 19 def run! write_pid_file super ensure remove_pid_file end |
#run_process ⇒ Object
38 39 40 41 42 |
# File 'lib/rflow/master.rb', line 38 def run_process EM.run do # TODO: Monitor the workers end end |
#shutdown!(reason) ⇒ Object
44 45 46 47 |
# File 'lib/rflow/master.rb', line 44 def shutdown!(reason) remove_pid_file super end |
#spawn_subprocesses ⇒ Object
26 27 28 29 30 31 32 |
# File 'lib/rflow/master.rb', line 26 def spawn_subprocesses RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0 brokers.each(&:spawn!) RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(", ")}" if brokers.count > 0 shards.each(&:run!) end |
#subprocesses ⇒ Object
34 35 36 |
# File 'lib/rflow/master.rb', line 34 def subprocesses brokers + shards.flat_map(&:workers) end |