Class: AWS::Flow::ActivityWorker
- Inherits:
-
GenericWorker
- Object
- GenericWorker
- AWS::Flow::ActivityWorker
- Defined in:
- lib/aws/decider/worker.rb
Overview
Used to implement an activity worker. You can use the ‘ActivityWorker` class to conveniently poll a task list for activity tasks.
You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ‘ActivityWorker` simply uses the object you provided.
Instance Method Summary collapse
-
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
-
#add_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
-
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
constructor
Creates a new ‘ActivityWorker` instance.
-
#register ⇒ Object
Registers the activity type.
-
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the ActivityTaskPoller.
-
#start(should_register = true) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker`.
Methods inherited from GenericWorker
Constructor Details
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
Creates a new ‘ActivityWorker` instance.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/aws/decider/worker.rb', line 244 def initialize(service, domain, task_list, *args, &block) @activity_definition_map = {} @activity_type_options = [] @options = Utilities::(WorkerOptions, block) @logger = @options.logger if @options @logger ||= Utilities::LogFactory.make_logger(self, "debug") max_workers = @options.execution_workers if @options max_workers = 20 if (max_workers.nil? || max_workers.zero?) @executor = ForkingExecutor.new(:max_workers => max_workers, :logger => @logger) @shutdown_first_time_function = lambda do @executor.shutdown Float::INFINITY Kernel.exit end super(service, domain, task_list, *args) end |
Instance Method Details
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/aws/decider/worker.rb', line 297 def add_activities_implementation(class_or_instance) klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance klass.activities.each do |activity_type| #TODO this should assign to an activityImplementation, so that we can call execute on it later @activity_definition_map[activity_type] = ActivityDefinition.new(instance, activity_type.name.split(".").last, nil, activity_type., activity_type..data_converter) = activity_type. option_hash = { :domain => @domain.name, :name => activity_type.name.to_s, :version => activity_type.version } option_hash.merge!(.) option_hash.merge!(:default_task_list => {:name => .default_task_list}) if .default_task_list @activity_type_options << option_hash end end |
#add_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
265 266 267 |
# File 'lib/aws/decider/worker.rb', line 265 def add_implementation(class_or_instance) add_activities_implementation(class_or_instance) end |
#register ⇒ Object
Registers the activity type.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/aws/decider/worker.rb', line 271 def register @activity_type_options.each do || begin @service.register_activity_type() rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e previous_registration = @service.describe_activity_type(:domain => @domain.name, :activity_type => {:name => [:name], :version => [:version]}) = .select {|key, val| key =~ /default/} previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym} previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)] if previous_registration[:default_task_list] previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name") end registration_difference = .sort.to_a - previous_registration.sort.to_a raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}" unless registration_difference.empty? # Purposefully eaten up, the alternative is to check first, and who # wants to do two trips when one will do? end end end |
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the AWS::Flow::ActivityTaskPoller.
338 339 340 341 342 343 |
# File 'lib/aws/decider/worker.rb', line 338 def run_once(should_register = true, poller = nil) register if should_register poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @executor, @options) if poller.nil? Kernel.exit if @shutting_down poller.poll_and_process_single_task(@options.use_forking) end |
#start(should_register = true) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker`.
322 323 324 325 326 327 328 |
# File 'lib/aws/decider/worker.rb', line 322 def start(should_register = true) register if should_register poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @executor, @options) loop do run_once(false, poller) end end |