Class: AWS::Flow::ActivityTaskPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/task_poller.rb

Overview

A poller for activity tasks.

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller

Initializes a new ‘ActivityTaskPoller`.

Parameters:

  • service

    Required. The AWS::SimpleWorkflow instance to use.

  • domain

    Required. The domain used by the workflow.

  • task_list

    Required. The task list used to poll for activity tasks.

  • activity_definition_map

    Required. The AWS::Flow::ActivityDefinition instance that implements the activity to run. This map is in the form:

    { :activity_type => 'activity_definition_name' }
    

    The named activity definition will be run when the #execute method is called.

  • options (defaults to: nil)

    Optional. Options to set for the activity poller. You can set the following options:

    • ‘logger` - The logger to use.

    • ‘max_workers` - The maximum number of workers that can be running at

      once. The default is 20.
      


115
116
117
118
119
120
121
122
123
# File 'lib/aws/decider/task_poller.rb', line 115

def initialize(service, domain, task_list, activity_definition_map, executor, options=nil)
  @service = service
  @domain = domain
  @task_list = task_list
  @activity_definition_map = activity_definition_map
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self, "debug")
  @executor = executor
end

Instance Method Details

#execute(task) ⇒ Object

Executes the specified activity task.

Parameters:



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/aws/decider/task_poller.rb', line 132

def execute(task)
  activity_type = task.activity_type
  begin
    context = ActivityExecutionContext.new(@service, @domain, task)
    activity_implementation = @activity_definition_map[activity_type]
    raise "This activity worker was told to work on activity type #{activity_type.name}, but this activity worker only knows how to work on #{@activity_definition_map.keys.map(&:name).join' '}" unless activity_implementation
    output, original_result, too_large = activity_implementation.execute(task.input, context)
    @logger.debug "Responding on task_token #{task.task_token} for task #{task}"
    if too_large
      @logger.warn "The output of this activity was too large (greater than 2^15), and therefore aws-flow could not return it to SWF. aws-flow is now attempting to mark this activity as failed. For reference, the result was #{original_result}"
      respond_activity_task_failed_with_retry(task.task_token, "An activity cannot send a response with a result larger than 32768 characters. Please reduce the response size. A truncated prefix output is included in the details field.", output)
    elsif ! activity_implementation.execution_options.manual_completion
      @service.respond_activity_task_completed(:task_token => task.task_token, :result => output)
    end
  rescue ActivityFailureException => e
    @logger.error "The activity failed, with original output of #{original_result} and dataconverted result of #{output}. aws-flow will now attempt to fail it."
    respond_activity_task_failed_with_retry(task.task_token, e.message, e.details)

  end
  #TODO all the completion stuffs
end

#poll_and_process_single_task(use_forking = true) ⇒ Object

Polls the task list for a new activity task, and executes it if one is found.

If ‘use_forking` is set to `true` and the maximum number of workers (as set in #initialize) are already executing, this method will block until the number of running workers is less than the maximum.

Parameters:

  • use_forking (defaults to: true)

    Optional. Whether to use forking to execute the task. On Windows, you should set this to ‘false`.



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/aws/decider/task_poller.rb', line 299

def poll_and_process_single_task(use_forking = true)
  @poll_semaphore ||= SuspendableSemaphore.new
  @poll_semaphore.acquire
  semaphore_needs_release = true
  @logger.debug "before the poll\n\n"
  # This is to warm the lazily loaded clients in the @service, so we don't
  # pay for their loading in every forked client
  begin
    if use_forking
      @executor.block_on_max_workers
    end
    task = @domain.activity_tasks.poll_for_single_task(@task_list)
    if task
      @logger.info "got a task, #{task.activity_type.name}"
      @logger.info "The task token I got was: #{task.task_token}"
    end
  rescue Exception => e
    @logger.error "I have not been able to poll successfully, and am now bailing out, with error #{e}"
    @poll_semaphore.release
    return false
  end
  if task.nil?
    "Still polling at #{Time.now}, but didn't get anything"
    @logger.debug "Still polling at #{Time.now}, but didn't get anything"
    @poll_semaphore.release
    return false
  end
  semaphore_needs_release = false
  if use_forking
    @executor.execute { process_single_task(task) }
  else
    process_single_task(task)
  end
  # process_single_task(task)
  @logger.debug "finished executing the task"
  return true
end

#process_single_task(task) ⇒ Object

Processes the specified activity task.

Parameters:



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/aws/decider/task_poller.rb', line 249

def process_single_task(task)

  # We are using the 'build' method to create a new ConnectionPool here to
  # make sure that connection pools are not shared among forked processes.
  # The default behavior of the ConnectionPool class is to cache a pool
  # for a set of options created by the 'new' method and always use the 
  # same pool for the same set of options. This is undesirable when
  # multiple processes want to use different connection pools with same
  # options as is the case here.
  # Since we can't change the pool of an already existing NetHttpHandler,
  # we also create a new NetHttpHandler in order to use the new pool.

  options = @service.config.to_h
  options[:connection_pool] = AWS::Core::Http::ConnectionPool.build(options[:http_handler].pool.options)
  options[:http_handler] = AWS::Core::Http::NetHttpHandler.new(options)
  @service = AWS::SimpleWorkflow.new(options).client

  begin
    begin
      execute(task)
    rescue CancellationException => e
      respond_activity_task_canceled_with_retry(task.task_token, e.message)
    rescue Exception => e
      @logger.error "Got an error, #{e.message}, while executing #{task.activity_type.name}"
      @logger.error "Full stack trace: #{e.backtrace}"
      respond_activity_task_failed_with_retry(task.task_token, e.message, e.backtrace)
      #Do rescue stuff
    ensure
      @poll_semaphore.release
    end
  rescue Exception => e
    semaphore_needs_release = true
    @logger.error "Got into the other error mode"
    raise e
  ensure
    @poll_semaphore.release if semaphore_needs_release
  end
end

#respond_activity_task_canceled(task_token, message) ⇒ Object

Responds to the decider that the activity task should be canceled. No retry is attempted.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry.

  • message

    Required. A message that provides detail about why the activity task is cancelled.



215
216
217
# File 'lib/aws/decider/task_poller.rb', line 215

def respond_activity_task_canceled(task_token, message)
  @service.respond_activity_task_canceled({:task_token => task_token, :details => message})
end

#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object

Note:

Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_canceled.

Responds to the decider that the activity task should be canceled, and attempts to retry the task.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry.

  • message

    Required. A message that provides detail about why the activity task is cancelled.



197
198
199
200
201
202
# File 'lib/aws/decider/task_poller.rb', line 197

def respond_activity_task_canceled_with_retry(task_token, message)
  if @failure_retrier.nil?
    respond_activity_task_canceled(task_token, message)
  end
  #TODO Set up other stuff to do if we have it
end

#respond_activity_task_failed(task_token, reason, details) ⇒ Object

Responds to the decider that the activity task has failed. No retry is attempted.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry. The task token is generated by the service and should be treated as an opaque value.

  • reason

    Required. Description of the error that may assist in diagnostics. Although this value is required, you can set it to an empty string if you don’t need this information.

  • details

    Required. Detailed information about the failure. Although this value is required, you can set it to an empty string if you don’t need this information.



237
238
239
240
# File 'lib/aws/decider/task_poller.rb', line 237

def respond_activity_task_failed(task_token, reason, details)
  @logger.debug "The task token to be reported on is #{task_token}"
  @service.respond_activity_task_failed(:task_token => task_token, :reason => reason.to_s, :details => details.to_s)
end

#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object

Note:

Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_failed.

Responds to the decider that the activity task has failed, and attempts to retry the task.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry. The task token is generated by the service and should be treated as an opaque value.

  • reason

    Required. Description of the error that may assist in diagnostics. Although this value is required, you can set it to an empty string if you don’t need this information.

  • details

    Required. Detailed information about the failure. Although this value is required, you can set it to an empty string if you don’t need this information.



175
176
177
178
179
180
181
# File 'lib/aws/decider/task_poller.rb', line 175

def respond_activity_task_failed_with_retry(task_token, reason, details)
  #TODO Set up this variable
  if @failure_retrier.nil?
    respond_activity_task_failed(task_token, reason, details)
    #TODO Set up other stuff to do if we have it
  end
end