Class: AWS::Flow::ActivityTaskPoller
- Inherits:
-
Object
- Object
- AWS::Flow::ActivityTaskPoller
- Defined in:
- lib/aws/decider/task_poller.rb
Overview
A poller for activity tasks.
Instance Method Summary collapse
-
#execute(task) ⇒ Object
Executes the specified activity task.
-
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
constructor
Initializes a new ‘ActivityTaskPoller`.
-
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
-
#process_single_task(task) ⇒ Object
Processes the specified activity task.
-
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled.
-
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
-
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed.
-
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed, and attempts to retry the task.
Constructor Details
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
Initializes a new ‘ActivityTaskPoller`.
119 120 121 122 123 124 125 126 127 |
# File 'lib/aws/decider/task_poller.rb', line 119 def initialize(service, domain, task_list, activity_definition_map, executor, =nil) @service = service @domain = domain @task_list = task_list @activity_definition_map = activity_definition_map @logger = .logger if @logger ||= Utilities::LogFactory.make_logger(self) @executor = executor end |
Instance Method Details
#execute(task) ⇒ Object
Executes the specified activity task.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/aws/decider/task_poller.rb', line 136 def execute(task) activity_type = task.activity_type begin context = ActivityExecutionContext.new(@service, @domain, task) unless activity_implementation = @activity_definition_map[activity_type] raise "This activity worker was told to work on activity type #{activity_type.name}.#{activity_type.version}, but this activity worker only knows how to work on #{@activity_definition_map.keys.map(&:name).join' '}" end output, original_result, too_large = activity_implementation.execute(task.input, context) @logger.debug "Responding on task_token #{task.task_token} for task #{task}." if too_large @logger.error "Activity #{activity_type.name}.#{activity_type.version} failed: The output of this activity was too large (greater than 2^15), and therefore aws-flow could not return it to SWF. aws-flow is now attempting to mark this activity as failed. For reference, the result was #{original_result}" respond_activity_task_failed_with_retry( task.task_token, "An activity cannot send a response with a result larger than 32768 characters. Please reduce the response size. A truncated prefix output is included in the details field.", output ) elsif ! activity_implementation..manual_completion @service.respond_activity_task_completed( :task_token => task.task_token, :result => output ) end rescue ActivityFailureException => e @logger.error "Activity #{activity_type.name}.#{activity_type.version} with input #{task.input} failed with exception #{e}." respond_activity_task_failed_with_retry( task.task_token, e., e.details ) end #TODO all the completion stuffs end |
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
If ‘use_forking` is set to `true` and the maximum number of workers (as set in #initialize) are already executing, this method will block until the number of running workers is less than the maximum.
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/aws/decider/task_poller.rb', line 318 def poll_and_process_single_task(use_forking = true) @poll_semaphore ||= SuspendableSemaphore.new @poll_semaphore.acquire semaphore_needs_release = true @logger.debug "Before the poll" begin if use_forking @executor.block_on_max_workers end @logger.debug "Polling for a new activity task of type #{@activity_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}" task = @domain.activity_tasks.poll_for_single_task(@task_list) if task @logger.info Utilities.activity_task_to_debug_string("Got activity task", task) end rescue Exception => e @logger.error "Error in the poller, #{e.class}, #{e}" @poll_semaphore.release return false end if task.nil? @logger.debug "Didn't get a task on task_list: #{@task_list}" @poll_semaphore.release return false end semaphore_needs_release = false if use_forking @executor.execute { process_single_task(task) } else process_single_task(task) end @logger.info Utilities.activity_task_to_debug_string("Finished executing task", task) return true end |
#process_single_task(task) ⇒ Object
Processes the specified activity task.
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/aws/decider/task_poller.rb', line 268 def process_single_task(task) # We are using the 'build' method to create a new ConnectionPool here to # make sure that connection pools are not shared among forked processes. # The default behavior of the ConnectionPool class is to cache a pool # for a set of options created by the 'new' method and always use the # same pool for the same set of options. This is undesirable when # multiple processes want to use different connection pools with same # options as is the case here. # Since we can't change the pool of an already existing NetHttpHandler, # we also create a new NetHttpHandler in order to use the new pool. = @service.config.to_h [:connection_pool] = AWS::Core::Http::ConnectionPool.build([:http_handler].pool.) [:http_handler] = AWS::Core::Http::NetHttpHandler.new() @service = AWS::SimpleWorkflow.new().client begin begin execute(task) rescue CancellationException => e @logger.error "Got an error, #{e.}, while executing #{task.activity_type.name}." respond_activity_task_canceled_with_retry(task.task_token, e.) rescue Exception => e @logger.error "Got an error, #{e.}, while executing #{task.activity_type.name}. Full stack trace: #{e.backtrace}" respond_activity_task_failed_with_retry(task.task_token, e., e.backtrace) #Do rescue stuff ensure @poll_semaphore.release end rescue Exception => e semaphore_needs_release = true @logger.error "Got into the other error mode: #{e}" raise e ensure @poll_semaphore.release if semaphore_needs_release end end |
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled. No retry is attempted.
234 235 236 |
# File 'lib/aws/decider/task_poller.rb', line 234 def respond_activity_task_canceled(task_token, ) @service.respond_activity_task_canceled({:task_token => task_token, :details => }) end |
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_canceled.
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
216 217 218 219 220 221 |
# File 'lib/aws/decider/task_poller.rb', line 216 def respond_activity_task_canceled_with_retry(task_token, ) if @failure_retrier.nil? respond_activity_task_canceled(task_token, ) end #TODO Set up other stuff to do if we have it end |
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed. No retry is attempted.
256 257 258 259 |
# File 'lib/aws/decider/task_poller.rb', line 256 def respond_activity_task_failed(task_token, reason, details) @logger.debug "The task token to be reported on is #{task_token}" @service.respond_activity_task_failed(:task_token => task_token, :reason => reason.to_s, :details => details.to_s) end |
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_failed.
Responds to the decider that the activity task has failed, and attempts to retry the task.
194 195 196 197 198 199 200 |
# File 'lib/aws/decider/task_poller.rb', line 194 def respond_activity_task_failed_with_retry(task_token, reason, details) #TODO Set up this variable if @failure_retrier.nil? respond_activity_task_failed(task_token, reason, details) #TODO Set up other stuff to do if we have it end end |