Class: AWS::Flow::ActivityTaskPoller
- Inherits:
-
Object
- Object
- AWS::Flow::ActivityTaskPoller
- Defined in:
- lib/aws/decider/task_poller.rb
Overview
A poller for activity tasks.
Instance Method Summary collapse
-
#execute(task) ⇒ Object
Executes the specified activity task.
-
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
constructor
Initializes a new ‘ActivityTaskPoller`.
-
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
-
#process_single_task(task) ⇒ Object
Processes the specified activity task.
-
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled.
-
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
-
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed.
-
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed, and attempts to retry the task.
Constructor Details
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
Initializes a new ‘ActivityTaskPoller`.
115 116 117 118 119 120 121 122 123 |
# File 'lib/aws/decider/task_poller.rb', line 115 def initialize(service, domain, task_list, activity_definition_map, executor, =nil) @service = service @domain = domain @task_list = task_list @activity_definition_map = activity_definition_map @logger = .logger if @logger ||= Utilities::LogFactory.make_logger(self, "debug") @executor = executor end |
Instance Method Details
#execute(task) ⇒ Object
Executes the specified activity task.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/aws/decider/task_poller.rb', line 132 def execute(task) activity_type = task.activity_type begin context = ActivityExecutionContext.new(@service, @domain, task) activity_implementation = @activity_definition_map[activity_type] raise "This activity worker was told to work on activity type #{activity_type.name}, but this activity worker only knows how to work on #{@activity_definition_map.keys.map(&:name).join' '}" unless activity_implementation output, original_result, too_large = activity_implementation.execute(task.input, context) @logger.debug "Responding on task_token #{task.task_token} for task #{task}" if too_large @logger.warn "The output of this activity was too large (greater than 2^15), and therefore aws-flow could not return it to SWF. aws-flow is now attempting to mark this activity as failed. For reference, the result was #{original_result}" respond_activity_task_failed_with_retry(task.task_token, "An activity cannot send a response with a result larger than 32768 characters. Please reduce the response size. A truncated prefix output is included in the details field.", output) elsif ! activity_implementation..manual_completion @service.respond_activity_task_completed(:task_token => task.task_token, :result => output) end rescue ActivityFailureException => e @logger.error "The activity failed, with original output of #{original_result} and dataconverted result of #{output}. aws-flow will now attempt to fail it." respond_activity_task_failed_with_retry(task.task_token, e., e.details) end #TODO all the completion stuffs end |
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
If ‘use_forking` is set to `true` and the maximum number of workers (as set in #initialize) are already executing, this method will block until the number of running workers is less than the maximum.
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/aws/decider/task_poller.rb', line 299 def poll_and_process_single_task(use_forking = true) @poll_semaphore ||= SuspendableSemaphore.new @poll_semaphore.acquire semaphore_needs_release = true @logger.debug "before the poll\n\n" # This is to warm the lazily loaded clients in the @service, so we don't # pay for their loading in every forked client begin if use_forking @executor.block_on_max_workers end task = @domain.activity_tasks.poll_for_single_task(@task_list) if task @logger.info "got a task, #{task.activity_type.name}" @logger.info "The task token I got was: #{task.task_token}" end rescue Exception => e @logger.error "I have not been able to poll successfully, and am now bailing out, with error #{e}" @poll_semaphore.release return false end if task.nil? "Still polling at #{Time.now}, but didn't get anything" @logger.debug "Still polling at #{Time.now}, but didn't get anything" @poll_semaphore.release return false end semaphore_needs_release = false if use_forking @executor.execute { process_single_task(task) } else process_single_task(task) end # process_single_task(task) @logger.debug "finished executing the task" return true end |
#process_single_task(task) ⇒ Object
Processes the specified activity task.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/aws/decider/task_poller.rb', line 249 def process_single_task(task) # We are using the 'build' method to create a new ConnectionPool here to # make sure that connection pools are not shared among forked processes. # The default behavior of the ConnectionPool class is to cache a pool # for a set of options created by the 'new' method and always use the # same pool for the same set of options. This is undesirable when # multiple processes want to use different connection pools with same # options as is the case here. # Since we can't change the pool of an already existing NetHttpHandler, # we also create a new NetHttpHandler in order to use the new pool. = @service.config.to_h [:connection_pool] = AWS::Core::Http::ConnectionPool.build([:http_handler].pool.) [:http_handler] = AWS::Core::Http::NetHttpHandler.new() @service = AWS::SimpleWorkflow.new().client begin begin execute(task) rescue CancellationException => e respond_activity_task_canceled_with_retry(task.task_token, e.) rescue Exception => e @logger.error "Got an error, #{e.}, while executing #{task.activity_type.name}" @logger.error "Full stack trace: #{e.backtrace}" respond_activity_task_failed_with_retry(task.task_token, e., e.backtrace) #Do rescue stuff ensure @poll_semaphore.release end rescue Exception => e semaphore_needs_release = true @logger.error "Got into the other error mode" raise e ensure @poll_semaphore.release if semaphore_needs_release end end |
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled. No retry is attempted.
215 216 217 |
# File 'lib/aws/decider/task_poller.rb', line 215 def respond_activity_task_canceled(task_token, ) @service.respond_activity_task_canceled({:task_token => task_token, :details => }) end |
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_canceled.
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
197 198 199 200 201 202 |
# File 'lib/aws/decider/task_poller.rb', line 197 def respond_activity_task_canceled_with_retry(task_token, ) if @failure_retrier.nil? respond_activity_task_canceled(task_token, ) end #TODO Set up other stuff to do if we have it end |
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed. No retry is attempted.
237 238 239 240 |
# File 'lib/aws/decider/task_poller.rb', line 237 def respond_activity_task_failed(task_token, reason, details) @logger.debug "The task token to be reported on is #{task_token}" @service.respond_activity_task_failed(:task_token => task_token, :reason => reason.to_s, :details => details.to_s) end |
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_failed.
Responds to the decider that the activity task has failed, and attempts to retry the task.
175 176 177 178 179 180 181 |
# File 'lib/aws/decider/task_poller.rb', line 175 def respond_activity_task_failed_with_retry(task_token, reason, details) #TODO Set up this variable if @failure_retrier.nil? respond_activity_task_failed(task_token, reason, details) #TODO Set up other stuff to do if we have it end end |