Module: AWS::Flow::Runner

Includes:
AWS::Flow
Defined in:
lib/aws/runner.rb

Class Method Summary collapse

Methods included from AWS::Flow

decision_context, version, with_retry, #workflow_client, workflow_client, #workflow_factory

Class Method Details

.add_dir_to_load_path(path) ⇒ Object

this is used to extend the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers

Raises:

  • (ArgumentError)


247
248
249
250
# File 'lib/aws/runner.rb', line 247

def self.add_dir_to_load_path(path)
  raise ArgumentError.new("Invalid directory path: \"" + path.to_s + "\"") if not FileTest.directory? path
  $LOAD_PATH.unshift path.to_s
end

.add_implementations(worker, json_fragment, what) ⇒ Object

used to add implementations to workers; see get_classes



102
103
104
105
# File 'lib/aws/runner.rb', line 102

def self.add_implementations(worker, json_fragment, what)
  classes = get_classes(json_fragment, what)
  classes.each { |c| worker.add_implementation(c) }
end

.all_subclasses(clazz) ⇒ Object

searches the object space for all subclasses of clazz



79
80
81
# File 'lib/aws/runner.rb', line 79

def self.all_subclasses(clazz)
  ObjectSpace.each_object(Class).select { |klass| klass.is_a? clazz }
end

.create_service_client(json_config) ⇒ Object



201
202
203
204
205
206
207
208
# File 'lib/aws/runner.rb', line 201

def self.create_service_client(json_config)
  # set the UserAgent prefix for all clients
  if json_config['user_agent_prefix'] then
    AWS.config(:user_agent_prefix => json_config['user_agent_prefix'])
  end
  
  swf = AWS::SimpleWorkflow.new
end

.expand_task_list(value) ⇒ Object

used to support host-specific task lists when the string “|hostname|” is found in the task list it is replaced by the host name

Raises:

  • (ArgumentError)


121
122
123
124
125
126
# File 'lib/aws/runner.rb', line 121

def self.expand_task_list(value)
  raise ArgumentError.new unless value
  ret = value
  ret.gsub!("|hostname|", Socket.gethostname)
  ret
end

.get_classes(json_fragment, what) ⇒ Object

used to extract and validate the ‘activity_classes’ and ‘workflow_classes’ fields from the config, or autodiscover subclasses in the ObjectSpace



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/aws/runner.rb', line 86

def self.get_classes(json_fragment, what)
  classes = json_fragment[what[:config_key]]
  if classes.nil? || classes.empty? then
    # discover the classes
    classes = all_subclasses( what[:clazz] )
  else
    # constantize the class names we just read from the config
    classes.map! { |c| Object.const_get(c) }
  end
  if classes.nil? || classes.empty? then
    raise ArgumentError.new "need at least one implementation class"
  end
  classes
end

.is_empty_field?(json_fragment, field_name) ⇒ Boolean

Returns:

  • (Boolean)


128
129
130
131
# File 'lib/aws/runner.rb', line 128

def self.is_empty_field?(json_fragment, field_name)
  field = json_fragment[field_name]
  field.nil? || field.empty?
end

.load_config_json(path) ⇒ Object

loads the configuration from a JSON file

Raises:

  • (ArgumentError)


255
256
257
258
# File 'lib/aws/runner.rb', line 255

def self.load_config_json(path)
  raise ArgumentError.new("Invalid file path: \"" + path.to_s + "\"") if not File.file? path
  config = JSON.parse(File.open(path) { |f| f.read })
end

.load_files(config_path, json_config, what) ⇒ Object

This is used to issue the necessary “require” commands to load the code needed to run a module

config_path: the path where the config file is, to be able to

resolve relative references

json_config: the content of the config what: what should loaded. This is a hash expected to contain two keys:

- :default_file : the file to load unless a specific list is provided
- :config_key : the key of the config element which can contain a
        specific list of files to load


143
144
145
146
147
148
149
150
# File 'lib/aws/runner.rb', line 143

def self.load_files(config_path, json_config, what)
  if is_empty_field?(json_config, what[:config_key]) then 
    file = File.join(File.dirname(config_path), what[:default_file])
    require file if File.exists? file
  else
    json_config[what[:config_key]].each { |file| require file if File.exists? file }
  end
end

.mainObject



277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/aws/runner.rb', line 277

def self.main
  options = parse_command_line
  config_path =  options[:file]
  config = load_config_json( config_path )
  add_dir_to_load_path( Pathname.new(config_path).dirname )
  setup_domains(config)
  workers = start_workers(config_path, config)
  setup_signal_handling(workers)

  # hang there until killed: this process is used to relay signals to children
  # to support and facilitate an orderly shutdown
  wait_for_child_processes(workers)

end

.parse_command_line(argv = ARGV) ⇒ Object

Raises:

  • (OptionParser::MissingArgument)


261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/aws/runner.rb', line 261

def self.parse_command_line(argv = ARGV)
  options = {}
  optparse = OptionParser.new do |opts|
    opts.on('-f', '--file JSON_CONFIG_FILE', "Mandatory JSON config file") do |f|
      options[:file] = f
    end
  end

  optparse.parse!(argv)

  # file parameter is not optional
  raise OptionParser::MissingArgument.new("file") if options[:file].nil?

  return options
end

.set_process_name(name) ⇒ Object



74
75
76
# File 'lib/aws/runner.rb', line 74

def self.set_process_name(name)
  $0 = name
end

.setup_domains(json_config) ⇒ Object

registers the domains if they are not



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/aws/runner.rb', line 60

def self.setup_domains(json_config)

  swf = create_service_client(json_config)

  json_config['domains'].each do |d|
    begin
      swf.client.describe_domain :name => d['name']
    rescue
      swf.client.register_domain( { :name => d['name'], 
                                    :workflow_execution_retention_period_in_days => d['retention_in_days'].to_s })
    end
  end
end

.setup_signal_handling(workers) ⇒ Object

setup forwarding of signals to child processes, to facilitate and support orderly shutdown



230
231
232
# File 'lib/aws/runner.rb', line 230

def self.setup_signal_handling(workers)
  Signal.trap("INT") { workers.each { |w| Process.kill("INT", w) }  }
end

.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object



107
108
109
110
111
112
113
114
115
116
# File 'lib/aws/runner.rb', line 107

def self.spawn_and_start_workers(json_fragment, process_name, worker)
  workers = []
  json_fragment['number_of_workers'].times do
    workers << fork do
      set_process_name(process_name)
      worker.start()
    end
  end
  workers
end

.start_activity_workers(swf, config_path, json_config) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/aws/runner.rb', line 152

def self.start_activity_workers(swf, config_path, json_config)
  workers = []
  # load all classes for the activities
  load_files(config_path, json_config, {:config_key => 'activity_paths',
               :default_file => File.join('flow', 'activities.rb')})

  # TODO: logger
  # start the workers for each spec
  json_config['activity_workers'].each do |w|
    fork_count = w['number_of_forks_per_worker'] || 1
    domain = AWS::SimpleWorkflow::Domain.new( w['domain'] )
    task_list = expand_task_list(w['task_list'])

    # create a worker
    worker = ActivityWorker.new(swf.client, domain, task_list, *w['activities']) {{ :max_workers => fork_count }}
    add_implementations(worker, w, {:config_key => 'activity_classes',
               :clazz => AWS::Flow::Activities})

    # start as many workers as desired in child processes
    workers << spawn_and_start_workers(w, "activity-worker", worker)
  end

  return workers
end

.start_workers(config_path, json_config) ⇒ Object

this will start all the workers and return an array of pids for the worker processes



214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/aws/runner.rb', line 214

def self.start_workers(config_path, json_config)
  
  workers = []
  
  swf = create_service_client(json_config)

  workers << start_activity_workers(swf, config_path, json_config)
  workers << start_workflow_workers(swf, config_path, json_config)

  # needed to avoid returning nested arrays based on the calls above
  workers.flatten!

end

.start_workflow_workers(swf, config_path, json_config) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/aws/runner.rb', line 177

def self.start_workflow_workers(swf, config_path, json_config)
  workers = []
  # load all the classes for the workflows
  load_files(config_path, json_config, {:config_key => 'workflow_paths',
               :default_file => File.join('flow', 'workflows.rb')})

  # TODO: logger
  # start the workers for each spec
  json_config['workflow_workers'].each do |w|
    domain = AWS::SimpleWorkflow::Domain.new( w['domain'] )
    task_list = expand_task_list(w['task_list'])

    # create a worker
    worker = WorkflowWorker.new(swf.client, domain, task_list, *w['workflows'])
    add_implementations(worker, w, {:config_key => 'workflow_classes',
               :clazz => AWS::Flow::Workflows})

    # start as many workers as desired in child processes
    workers << spawn_and_start_workers(w, "workflow-worker", worker)
  end

  return workers
end

.wait_for_child_processes(workers) ⇒ Object

TODO: use a logger this will wait until all the child workers have died



236
237
238
239
240
241
242
# File 'lib/aws/runner.rb', line 236

def self.wait_for_child_processes(workers)
  until workers.empty?
    puts "waiting on workers " + workers.to_s + " to complete"
    dead_guys = Process.waitall
    dead_guys.each { |pid, status| workers.delete(pid); puts pid.to_s + " exited" }
  end
end