Module: AWS::Flow::Runner
- Includes:
- AWS::Flow
- Defined in:
- lib/aws/runner.rb
Class Method Summary collapse
-
.add_dir_to_load_path(path) ⇒ Object
this is used to extend the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers.
-
.add_implementations(worker, json_fragment, what) ⇒ Object
used to add implementations to workers; see get_classes.
-
.all_subclasses(clazz) ⇒ Object
searches the object space for all subclasses of clazz.
- .create_service_client(json_config) ⇒ Object
-
.expand_task_list(value) ⇒ Object
used to support host-specific task lists when the string “|hostname|” is found in the task list it is replaced by the host name.
-
.get_classes(json_fragment, what) ⇒ Object
used to extract and validate the ‘activity_classes’ and ‘workflow_classes’ fields from the config, or autodiscover subclasses in the ObjectSpace.
- .is_empty_field?(json_fragment, field_name) ⇒ Boolean
-
.load_config_json(path) ⇒ Object
loads the configuration from a JSON file.
-
.load_files(config_path, json_config, what) ⇒ Object
This is used to issue the necessary “require” commands to load the code needed to run a module.
- .main ⇒ Object
- .parse_command_line(argv = ARGV) ⇒ Object
- .set_process_name(name) ⇒ Object
-
.setup_domain(json_config) ⇒ Object
registers the domain if it is not.
-
.setup_signal_handling(workers) ⇒ Object
setup forwarding of signals to child processes, to facilitate and support orderly shutdown.
- .spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
- .start_activity_workers(swf, domain = nil, config_path, json_config) ⇒ Object
-
.start_workers(domain = nil, config_path, json_config) ⇒ Object
this will start all the workers and return an array of pids for the worker processes.
- .start_workflow_workers(swf, domain = nil, config_path, json_config) ⇒ Object
-
.wait_for_child_processes(workers) ⇒ Object
TODO: use a logger this will wait until all the child workers have died.
Methods included from AWS::Flow
decision_context, version, with_retry, #workflow_client, workflow_client, #workflow_factory
Class Method Details
.add_dir_to_load_path(path) ⇒ Object
this is used to extend the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers
251 252 253 254 |
# File 'lib/aws/runner.rb', line 251 def self.add_dir_to_load_path(path) raise ArgumentError.new("Invalid directory path: \"" + path.to_s + "\"") if not FileTest.directory? path $LOAD_PATH.unshift path.to_s end |
.add_implementations(worker, json_fragment, what) ⇒ Object
used to add implementations to workers; see get_classes
103 104 105 106 |
# File 'lib/aws/runner.rb', line 103 def self.add_implementations(worker, json_fragment, what) classes = get_classes(json_fragment, what) classes.each { |c| worker.add_implementation(c) } end |
.all_subclasses(clazz) ⇒ Object
searches the object space for all subclasses of clazz
80 81 82 |
# File 'lib/aws/runner.rb', line 80 def self.all_subclasses(clazz) ObjectSpace.each_object(Class).select { |klass| klass.is_a? clazz } end |
.create_service_client(json_config) ⇒ Object
205 206 207 208 209 210 211 212 |
# File 'lib/aws/runner.rb', line 205 def self.create_service_client(json_config) # set the UserAgent prefix for all clients if json_config['user_agent_prefix'] then AWS.config(user_agent_prefix: json_config['user_agent_prefix']) end swf = AWS::SimpleWorkflow.new end |
.expand_task_list(value) ⇒ Object
used to support host-specific task lists when the string “|hostname|” is found in the task list it is replaced by the host name
123 124 125 126 127 128 |
# File 'lib/aws/runner.rb', line 123 def self.(value) raise ArgumentError.new unless value ret = value ret.gsub!("|hostname|", Socket.gethostname) ret end |
.get_classes(json_fragment, what) ⇒ Object
used to extract and validate the ‘activity_classes’ and ‘workflow_classes’ fields from the config, or autodiscover subclasses in the ObjectSpace
87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/aws/runner.rb', line 87 def self.get_classes(json_fragment, what) classes = json_fragment[what[:config_key]] if classes.nil? || classes.empty? then # discover the classes classes = all_subclasses( what[:clazz] ) else # constantize the class names we just read from the config classes.map! { |c| Object.const_get(c) } end if classes.nil? || classes.empty? then raise ArgumentError.new "need at least one implementation class" end classes end |
.is_empty_field?(json_fragment, field_name) ⇒ Boolean
130 131 132 133 |
# File 'lib/aws/runner.rb', line 130 def self.is_empty_field?(json_fragment, field_name) field = json_fragment[field_name] field.nil? || field.empty? end |
.load_config_json(path) ⇒ Object
loads the configuration from a JSON file
259 260 261 262 |
# File 'lib/aws/runner.rb', line 259 def self.load_config_json(path) raise ArgumentError.new("Invalid file path: \"" + path.to_s + "\"") if not File.file? path config = JSON.parse(File.open(path) { |f| f.read }) end |
.load_files(config_path, json_config, what) ⇒ Object
This is used to issue the necessary “require” commands to load the code needed to run a module
config_path: the path where the config file is, to be able to
resolve relative references
json_config: the content of the config what: what should loaded. This is a hash expected to contain two keys:
- :default_file : the file to load unless a specific list is provided
- :config_key : the key of the config element which can contain a
specific list of files to load
145 146 147 148 149 150 151 152 |
# File 'lib/aws/runner.rb', line 145 def self.load_files(config_path, json_config, what) if is_empty_field?(json_config, what[:config_key]) then file = File.join(File.dirname(config_path), what[:default_file]) require file if File.exists? file else json_config[what[:config_key]].each { |file| require file if File.exists? file } end end |
.main ⇒ Object
281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/aws/runner.rb', line 281 def self.main = parse_command_line config_path = [:file] config = load_config_json( config_path ) add_dir_to_load_path( Pathname.new(config_path).dirname ) domain = setup_domain(config) workers = start_workers(domain, config_path, config) setup_signal_handling(workers) # hang there until killed: this process is used to relay signals to children # to support and facilitate an orderly shutdown wait_for_child_processes(workers) end |
.parse_command_line(argv = ARGV) ⇒ Object
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/aws/runner.rb', line 265 def self.parse_command_line(argv = ARGV) = {} optparse = OptionParser.new do |opts| opts.on('-f', '--file JSON_CONFIG_FILE', "Mandatory JSON config file") do |f| [:file] = f end end optparse.parse!(argv) # file parameter is not optional raise OptionParser::MissingArgument.new("file") if [:file].nil? return end |
.set_process_name(name) ⇒ Object
75 76 77 |
# File 'lib/aws/runner.rb', line 75 def self.set_process_name(name) $0 = name end |
.setup_domain(json_config) ⇒ Object
registers the domain if it is not
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/aws/runner.rb', line 56 def self.setup_domain(json_config) swf = create_service_client(json_config) domain = json_config['domain'] # If retention period is not provided, default it to 7 days retention = domain['retention_in_days'] || FlowConstants::RETENTION_DEFAULT begin swf.client.register_domain({ name: domain['name'], workflow_execution_retention_period_in_days: retention.to_s }) rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e # possible log an INFO/WARN if the domain already exists. end return AWS::SimpleWorkflow::Domain.new( domain['name'] ) end |
.setup_signal_handling(workers) ⇒ Object
setup forwarding of signals to child processes, to facilitate and support orderly shutdown
234 235 236 |
# File 'lib/aws/runner.rb', line 234 def self.setup_signal_handling(workers) Signal.trap("INT") { workers.each { |w| Process.kill("INT", w) } } end |
.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/aws/runner.rb', line 108 def self.spawn_and_start_workers(json_fragment, process_name, worker) workers = [] num_of_workers = json_fragment['number_of_workers'] || FlowConstants::NUM_OF_WORKERS_DEFAULT num_of_workers.times do workers << fork do set_process_name(process_name) worker.start() end end workers end |
.start_activity_workers(swf, domain = nil, config_path, json_config) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/aws/runner.rb', line 154 def self.start_activity_workers(swf, domain = nil, config_path, json_config) workers = [] # load all classes for the activities load_files(config_path, json_config, {config_key: 'activity_paths', default_file: File.join('flow', 'activities.rb')}) domain = setup_domain(json_config) if domain.nil? # TODO: logger # start the workers for each spec json_config['activity_workers'].each do |w| # If number of forks is not provided, it will automatically default to 20 # within the ActivityWorker fork_count = w['number_of_forks_per_worker'] task_list = (w['task_list']) # create a worker worker = ActivityWorker.new(swf.client, domain, task_list, *w['activities']) {{ max_workers: fork_count }} add_implementations(worker, w, {config_key: 'activity_classes', clazz: AWS::Flow::Activities}) # start as many workers as desired in child processes workers << spawn_and_start_workers(w, "activity-worker", worker) end return workers end |
.start_workers(domain = nil, config_path, json_config) ⇒ Object
this will start all the workers and return an array of pids for the worker processes
218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/aws/runner.rb', line 218 def self.start_workers(domain = nil, config_path, json_config) workers = [] swf = create_service_client(json_config) workers << start_activity_workers(swf, domain, config_path, json_config) workers << start_workflow_workers(swf, domain, config_path, json_config) # needed to avoid returning nested arrays based on the calls above workers.flatten! end |
.start_workflow_workers(swf, domain = nil, config_path, json_config) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/aws/runner.rb', line 181 def self.start_workflow_workers(swf, domain = nil, config_path, json_config) workers = [] # load all the classes for the workflows load_files(config_path, json_config, {config_key: 'workflow_paths', default_file: File.join('flow', 'workflows.rb')}) domain = setup_domain(json_config) if domain.nil? # TODO: logger # start the workers for each spec json_config['workflow_workers'].each do |w| task_list = (w['task_list']) # create a worker worker = WorkflowWorker.new(swf.client, domain, task_list, *w['workflows']) add_implementations(worker, w, {config_key: 'workflow_classes', clazz: AWS::Flow::Workflows}) # start as many workers as desired in child processes workers << spawn_and_start_workers(w, "workflow-worker", worker) end return workers end |
.wait_for_child_processes(workers) ⇒ Object
TODO: use a logger this will wait until all the child workers have died
240 241 242 243 244 245 246 |
# File 'lib/aws/runner.rb', line 240 def self.wait_for_child_processes(workers) until workers.empty? puts "waiting on workers " + workers.to_s + " to complete" dead_guys = Process.waitall dead_guys.each { |pid, status| workers.delete(pid); puts pid.to_s + " exited" } end end |