Module: AWS::Flow::Runner
- Includes:
- AWS::Flow
- Defined in:
- lib/aws/runner.rb
Class Method Summary collapse
-
.add_dir_to_load_path(path) ⇒ Object
this is used to extend the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers.
-
.add_implementations(worker, json_fragment, what) ⇒ Object
used to add implementations to workers; see get_classes.
-
.all_subclasses(clazz) ⇒ Object
searches the object space for all subclasses of clazz.
- .create_service_client(json_config) ⇒ Object
-
.expand_task_list(value) ⇒ Object
used to support host-specific task lists when the string “|hostname|” is found in the task list it is replaced by the host name.
-
.get_classes(json_fragment, what) ⇒ Object
used to extract and validate the ‘activity_classes’ and ‘workflow_classes’ fields from the config, or autodiscover subclasses in the ObjectSpace.
- .is_empty_field?(json_fragment, field_name) ⇒ Boolean
-
.load_config_json(path) ⇒ Object
loads the configuration from a JSON file.
-
.load_files(config_path, json_config, what) ⇒ Object
This is used to issue the necessary “require” commands to load the code needed to run a module.
- .main ⇒ Object
- .parse_command_line(argv = ARGV) ⇒ Object
- .set_process_name(name) ⇒ Object
-
.setup_domains(json_config) ⇒ Object
registers the domains if they are not.
-
.setup_signal_handling(workers) ⇒ Object
setup forwarding of signals to child processes, to facilitate and support orderly shutdown.
- .spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
- .start_activity_workers(swf, config_path, json_config) ⇒ Object
-
.start_workers(config_path, json_config) ⇒ Object
this will start all the workers and return an array of pids for the worker processes.
- .start_workflow_workers(swf, config_path, json_config) ⇒ Object
-
.wait_for_child_processes(workers) ⇒ Object
TODO: use a logger this will wait until all the child workers have died.
Methods included from AWS::Flow
decision_context, version, with_retry, #workflow_client, workflow_client, #workflow_factory
Class Method Details
.add_dir_to_load_path(path) ⇒ Object
this is used to extend the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers
247 248 249 250 |
# File 'lib/aws/runner.rb', line 247 def self.add_dir_to_load_path(path) raise ArgumentError.new("Invalid directory path: \"" + path.to_s + "\"") if not FileTest.directory? path $LOAD_PATH.unshift path.to_s end |
.add_implementations(worker, json_fragment, what) ⇒ Object
used to add implementations to workers; see get_classes
102 103 104 105 |
# File 'lib/aws/runner.rb', line 102 def self.add_implementations(worker, json_fragment, what) classes = get_classes(json_fragment, what) classes.each { |c| worker.add_implementation(c) } end |
.all_subclasses(clazz) ⇒ Object
searches the object space for all subclasses of clazz
79 80 81 |
# File 'lib/aws/runner.rb', line 79 def self.all_subclasses(clazz) ObjectSpace.each_object(Class).select { |klass| klass.is_a? clazz } end |
.create_service_client(json_config) ⇒ Object
201 202 203 204 205 206 207 208 |
# File 'lib/aws/runner.rb', line 201 def self.create_service_client(json_config) # set the UserAgent prefix for all clients if json_config['user_agent_prefix'] then AWS.config(:user_agent_prefix => json_config['user_agent_prefix']) end swf = AWS::SimpleWorkflow.new end |
.expand_task_list(value) ⇒ Object
used to support host-specific task lists when the string “|hostname|” is found in the task list it is replaced by the host name
121 122 123 124 125 126 |
# File 'lib/aws/runner.rb', line 121 def self.(value) raise ArgumentError.new unless value ret = value ret.gsub!("|hostname|", Socket.gethostname) ret end |
.get_classes(json_fragment, what) ⇒ Object
used to extract and validate the ‘activity_classes’ and ‘workflow_classes’ fields from the config, or autodiscover subclasses in the ObjectSpace
86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/aws/runner.rb', line 86 def self.get_classes(json_fragment, what) classes = json_fragment[what[:config_key]] if classes.nil? || classes.empty? then # discover the classes classes = all_subclasses( what[:clazz] ) else # constantize the class names we just read from the config classes.map! { |c| Object.const_get(c) } end if classes.nil? || classes.empty? then raise ArgumentError.new "need at least one implementation class" end classes end |
.is_empty_field?(json_fragment, field_name) ⇒ Boolean
128 129 130 131 |
# File 'lib/aws/runner.rb', line 128 def self.is_empty_field?(json_fragment, field_name) field = json_fragment[field_name] field.nil? || field.empty? end |
.load_config_json(path) ⇒ Object
loads the configuration from a JSON file
255 256 257 258 |
# File 'lib/aws/runner.rb', line 255 def self.load_config_json(path) raise ArgumentError.new("Invalid file path: \"" + path.to_s + "\"") if not File.file? path config = JSON.parse(File.open(path) { |f| f.read }) end |
.load_files(config_path, json_config, what) ⇒ Object
This is used to issue the necessary “require” commands to load the code needed to run a module
config_path: the path where the config file is, to be able to
resolve relative references
json_config: the content of the config what: what should loaded. This is a hash expected to contain two keys:
- :default_file : the file to load unless a specific list is provided
- :config_key : the key of the config element which can contain a
specific list of files to load
143 144 145 146 147 148 149 150 |
# File 'lib/aws/runner.rb', line 143 def self.load_files(config_path, json_config, what) if is_empty_field?(json_config, what[:config_key]) then file = File.join(File.dirname(config_path), what[:default_file]) require file if File.exists? file else json_config[what[:config_key]].each { |file| require file if File.exists? file } end end |
.main ⇒ Object
277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/aws/runner.rb', line 277 def self.main = parse_command_line config_path = [:file] config = load_config_json( config_path ) add_dir_to_load_path( Pathname.new(config_path).dirname ) setup_domains(config) workers = start_workers(config_path, config) setup_signal_handling(workers) # hang there until killed: this process is used to relay signals to children # to support and facilitate an orderly shutdown wait_for_child_processes(workers) end |
.parse_command_line(argv = ARGV) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/aws/runner.rb', line 261 def self.parse_command_line(argv = ARGV) = {} optparse = OptionParser.new do |opts| opts.on('-f', '--file JSON_CONFIG_FILE', "Mandatory JSON config file") do |f| [:file] = f end end optparse.parse!(argv) # file parameter is not optional raise OptionParser::MissingArgument.new("file") if [:file].nil? return end |
.set_process_name(name) ⇒ Object
74 75 76 |
# File 'lib/aws/runner.rb', line 74 def self.set_process_name(name) $0 = name end |
.setup_domains(json_config) ⇒ Object
registers the domains if they are not
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/aws/runner.rb', line 60 def self.setup_domains(json_config) swf = create_service_client(json_config) json_config['domains'].each do |d| begin swf.client.describe_domain :name => d['name'] rescue swf.client.register_domain( { :name => d['name'], :workflow_execution_retention_period_in_days => d['retention_in_days'].to_s }) end end end |
.setup_signal_handling(workers) ⇒ Object
setup forwarding of signals to child processes, to facilitate and support orderly shutdown
230 231 232 |
# File 'lib/aws/runner.rb', line 230 def self.setup_signal_handling(workers) Signal.trap("INT") { workers.each { |w| Process.kill("INT", w) } } end |
.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/aws/runner.rb', line 107 def self.spawn_and_start_workers(json_fragment, process_name, worker) workers = [] json_fragment['number_of_workers'].times do workers << fork do set_process_name(process_name) worker.start() end end workers end |
.start_activity_workers(swf, config_path, json_config) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/aws/runner.rb', line 152 def self.start_activity_workers(swf, config_path, json_config) workers = [] # load all classes for the activities load_files(config_path, json_config, {:config_key => 'activity_paths', :default_file => File.join('flow', 'activities.rb')}) # TODO: logger # start the workers for each spec json_config['activity_workers'].each do |w| fork_count = w['number_of_forks_per_worker'] || 1 domain = AWS::SimpleWorkflow::Domain.new( w['domain'] ) task_list = (w['task_list']) # create a worker worker = ActivityWorker.new(swf.client, domain, task_list, *w['activities']) {{ :max_workers => fork_count }} add_implementations(worker, w, {:config_key => 'activity_classes', :clazz => AWS::Flow::Activities}) # start as many workers as desired in child processes workers << spawn_and_start_workers(w, "activity-worker", worker) end return workers end |
.start_workers(config_path, json_config) ⇒ Object
this will start all the workers and return an array of pids for the worker processes
214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/aws/runner.rb', line 214 def self.start_workers(config_path, json_config) workers = [] swf = create_service_client(json_config) workers << start_activity_workers(swf, config_path, json_config) workers << start_workflow_workers(swf, config_path, json_config) # needed to avoid returning nested arrays based on the calls above workers.flatten! end |
.start_workflow_workers(swf, config_path, json_config) ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/aws/runner.rb', line 177 def self.start_workflow_workers(swf, config_path, json_config) workers = [] # load all the classes for the workflows load_files(config_path, json_config, {:config_key => 'workflow_paths', :default_file => File.join('flow', 'workflows.rb')}) # TODO: logger # start the workers for each spec json_config['workflow_workers'].each do |w| domain = AWS::SimpleWorkflow::Domain.new( w['domain'] ) task_list = (w['task_list']) # create a worker worker = WorkflowWorker.new(swf.client, domain, task_list, *w['workflows']) add_implementations(worker, w, {:config_key => 'workflow_classes', :clazz => AWS::Flow::Workflows}) # start as many workers as desired in child processes workers << spawn_and_start_workers(w, "workflow-worker", worker) end return workers end |
.wait_for_child_processes(workers) ⇒ Object
TODO: use a logger this will wait until all the child workers have died
236 237 238 239 240 241 242 |
# File 'lib/aws/runner.rb', line 236 def self.wait_for_child_processes(workers) until workers.empty? puts "waiting on workers " + workers.to_s + " to complete" dead_guys = Process.waitall dead_guys.each { |pid, status| workers.delete(pid); puts pid.to_s + " exited" } end end |