Class: Messagebus::Swarm::Controller

Inherits:
Object
  • Object
show all
Extended by:
DroneFactoryConcerns, ProcessManagementConcerns
Defined in:
lib/messagebus/swarm/controller.rb

Overview

The controller for a set of drone workers.

Defined Under Namespace

Modules: DroneFactoryConcerns, ProcessManagementConcerns Classes: BadConfigurationError, ConfigurationSource

Constant Summary

Constants included from ProcessManagementConcerns

ProcessManagementConcerns::STOP_PARENT_PROCESSING_SIGNALS, ProcessManagementConcerns::STOP_SUBPROCESS_SIGNAL

Class Method Summary collapse

Methods included from ProcessManagementConcerns

start_drones, stop_drones

Methods included from DroneFactoryConcerns

build_drones

Class Method Details

.after_fork(&block) ⇒ Object



72
73
74
# File 'lib/messagebus/swarm/controller.rb', line 72

def self.after_fork(&block)
  after_fork_procs << block
end

.after_fork_procsObject



76
77
78
# File 'lib/messagebus/swarm/controller.rb', line 76

def self.after_fork_procs
  @after_fork_procs ||= []
end

.delete_pid(pid_file) ⇒ Object



127
128
129
# File 'lib/messagebus/swarm/controller.rb', line 127

def self.delete_pid(pid_file)
  File.delete(pid_file) if File.exist?(pid_file)
end

.require_files(files) ⇒ Object



119
120
121
# File 'lib/messagebus/swarm/controller.rb', line 119

def self.require_files(files)
  files.each { |file_to_require| require file_to_require }
end

.start(configuration_source, drone_logger, destination_name = nil, drone_count = nil) ⇒ Object

Starts up the swarm based on the given config. This method does not return until the swarm is stopped down.

If the config has config[:fork]=true, it will boot the drones in subprocesses, otherwise it will use threads.

destination_name

limit booting drones to only ones acting on the given

destination

drone_count

override the number of drones to run



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/messagebus/swarm/controller.rb', line 90

def self.start(configuration_source, drone_logger, destination_name=nil, drone_count=nil)
  config = if configuration_source.is_a?(ConfigurationSource)
    configuration_source.configuration_hash
  else
    configuration_source
  end
  raise BadConfigurationError.new("#{configuration_source.inspect} didn't evaluate to a configuration") if config.nil?
  config = Messagebus::DottableHash.new(config)
  relevant_worker_configs = config.workers

  # apply any applicable destination_name or drone_count settings
  relevant_worker_configs = relevant_worker_configs.select { |worker_config| worker_config[:destination] == destination_name } if destination_name
  relevant_worker_configs = relevant_worker_configs.map { |worker_config| worker_config.merge(:drones => drone_count) } if drone_count

  # The || is for backwards compatibility
  default_cluster_config = config.cluster_defaults || config

  drones = build_drones(relevant_worker_configs, default_cluster_config, config.clusters, swarm_control_logger, drone_logger)
  booter = start_drones(swarm_control_logger, config.swarm_config && config.swarm_config.fork, drones)

  booter.wait
end

.stop(pid) ⇒ Object

Shut down a previously started swarm



115
116
117
# File 'lib/messagebus/swarm/controller.rb', line 115

def self.stop(pid)
  stop_drones(pid)
end

.swarm_control_loggerObject



68
69
70
# File 'lib/messagebus/swarm/controller.rb', line 68

def self.swarm_control_logger
  @swarm_control_logger ||= Logger.new($stdout)
end

.swarm_control_logger=(swarm_control_logger) ⇒ Object

It’s important this a different logger instance than the one used for the drones/consumers/other things to avoid deadlocking issues. It’s ok for it to use the same file, just not be the same instance of a logger.

This logger will be used in a signal handler, and logging involves mutexes, so we need/want to be sure the logger isn’t being used by any other code outside the signal handler.



65
66
67
# File 'lib/messagebus/swarm/controller.rb', line 65

def self.swarm_control_logger=(swarm_control_logger)
  @swarm_control_logger = swarm_control_logger
end

.write_pid(pid_file) ⇒ Object



123
124
125
# File 'lib/messagebus/swarm/controller.rb', line 123

def self.write_pid(pid_file)
  File.open(pid_file, "w") { |f| f.print(Process.pid) }
end