Class: RFlow::Master

Inherits:
DaemonProcess show all
Defined in:
lib/rflow/master.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DaemonProcess

#daemonize!

Constructor Details

#initialize(config) ⇒ Master

Returns a new instance of Master.



11
12
13
14
15
16
17
# File 'lib/rflow/master.rb', line 11

def initialize(config)
  super(config['rflow.application_name'], 'Master')
  @pid_file = PIDFile.new(config['rflow.pid_file_path'])
  @shards = config.shards.map {|config| Shard.new(config) }
  RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max
  @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) }
end

Instance Attribute Details

#brokersObject (readonly)

Returns the value of attribute brokers.



9
10
11
# File 'lib/rflow/master.rb', line 9

def brokers
  @brokers
end

#shardsObject (readonly)

Returns the value of attribute shards.



8
9
10
# File 'lib/rflow/master.rb', line 8

def shards
  @shards
end

Instance Method Details

#run!Object



19
20
21
22
23
24
# File 'lib/rflow/master.rb', line 19

def run!
  write_pid_file
  super
ensure
  remove_pid_file
end

#run_processObject



38
39
40
41
42
# File 'lib/rflow/master.rb', line 38

def run_process
  EM.run do
    # TODO: Monitor the workers
  end
end

#shutdown!(reason) ⇒ Object



44
45
46
47
# File 'lib/rflow/master.rb', line 44

def shutdown!(reason)
  remove_pid_file
  super
end

#spawn_subprocessesObject



26
27
28
29
30
31
32
# File 'lib/rflow/master.rb', line 26

def spawn_subprocesses
  RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0
  brokers.each(&:spawn!)
  RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(", ")}" if brokers.count > 0

  shards.each(&:run!)
end

#subprocessesObject



34
35
36
# File 'lib/rflow/master.rb', line 34

def subprocesses
  brokers + shards.flat_map(&:workers)
end