Class: AWS::Flow::GenericActivityClient
- Inherits:
-
GenericClient
- Object
- GenericClient
- AWS::Flow::GenericActivityClient
- Defined in:
- lib/aws/decider/activity.rb
Overview
A generic activity client that can be used to perform standard activity actions.
Instance Attribute Summary collapse
-
#data_converter ⇒ Object
The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type.
-
#decision_helper ⇒ Object
The decision helper used by the activity client.
-
#options ⇒ Object
A hash of ActivityRuntimeOptions for the activity client.
Attributes inherited from GenericClient
Class Method Summary collapse
-
.default_option_class ⇒ Object
Returns the default option class for the activity client, which is ActivityRuntimeOptions.
Instance Method Summary collapse
-
#activity_name_from_activity_type(name) ⇒ Object
private
Separates the activity name from the activity type at the point of the last period.
-
#handle_activity_task_canceled(event) ⇒ Object
A handler for the ‘ActivityClassCanceled` event.
-
#handle_activity_task_completed(event) ⇒ Object
A handler for the ‘ActivityClassCompleted` event.
-
#handle_activity_task_failed(event) ⇒ Object
A handler for the ‘ActivityTaskFailed` event.
-
#handle_activity_task_timed_out(event) ⇒ Object
A handler for the ‘ActivityClassTimedOut` event.
-
#handle_schedule_activity_task_failed(event) ⇒ Object
A handler for the ‘ScheduleActivityTaskFailed` event.
-
#initialize(decision_helper, options) ⇒ GenericActivityClient
constructor
Creates a new GenericActivityClient instance.
-
#method_missing(method_name, *args, &block) ⇒ Object
Registers and schedules a new activity type, provided a name and block of options.
- #method_to_retry_alias(method_name) ⇒ Object private
-
#request_cancel_activity_task(to_cancel) ⇒ Object
Requests that the activity is canceled.
- #retry_alias_to_method(retry_alias) ⇒ Object private
-
#schedule_activity(name, activity_type, input, options) ⇒ Object
Schedules a named activity.
Methods inherited from GenericClient
#_retry, #_retry_with_options, #bail_if_external, #decision_context, #exponential_retry, #reconfigure, #retry, #send_async, #with_opts
Constructor Details
#initialize(decision_helper, options) ⇒ GenericActivityClient
Creates a new GenericActivityClient instance.
74 75 76 77 78 79 80 81 |
# File 'lib/aws/decider/activity.rb', line 74 def initialize(decision_helper, ) @decision_helper = decision_helper @options = @activity_option_map = @decision_helper. @failure_map = {} @data_converter ||= YAMLDataConverter.new super end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_name, *args, &block) ⇒ Object
Registers and schedules a new activity type, provided a name and block of options.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/aws/decider/activity.rb', line 95 def method_missing(method_name, *args, &block) = Utilities::(ActivityOptions, block) = Utilities::(@options, @activity_option_map[method_name.to_sym], @option_map[method_name.to_sym], ) = ActivityOptions.new() activity_type = ActivityType.new("#{.prefix_name}.#{method_name.to_s}", .version, .) if ._exponential_retry retry_function = ._exponential_retry.retry_function || FlowConstants.exponential_retry_function ._exponential_retry.return_on_start ||= .return_on_start future = (lambda { self.schedule_activity(activity_type.name, activity_type, args, ) }, retry_function, ._exponential_retry, args) return future if .return_on_start result = Utilities::drill_on_future(future) else result = schedule_activity(activity_type.name, activity_type, args, ) end result end |
Instance Attribute Details
#data_converter ⇒ Object
The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type. By default, this is YAMLDataConverter.
45 46 47 |
# File 'lib/aws/decider/activity.rb', line 45 def data_converter @data_converter end |
#decision_helper ⇒ Object
The decision helper used by the activity client.
48 49 50 |
# File 'lib/aws/decider/activity.rb', line 48 def decision_helper @decision_helper end |
#options ⇒ Object
A hash of ActivityRuntimeOptions for the activity client.
51 52 53 |
# File 'lib/aws/decider/activity.rb', line 51 def @options end |
Class Method Details
.default_option_class ⇒ Object
Returns the default option class for the activity client, which is ActivityRuntimeOptions.
54 |
# File 'lib/aws/decider/activity.rb', line 54 def self.default_option_class; ActivityRuntimeOptions; end |
Instance Method Details
#activity_name_from_activity_type(name) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Separates the activity name from the activity type at the point of the last period.
62 63 64 |
# File 'lib/aws/decider/activity.rb', line 62 def activity_name_from_activity_type(name) return name.to_s.split(".").last.to_sym end |
#handle_activity_task_canceled(event) ⇒ Object
A handler for the ‘ActivityClassCanceled` event.
145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/aws/decider/activity.rb', line 145 def handle_activity_task_canceled(event) activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id]) @decision_helper[activity_id].consume(:handle_cancellation_event) if @decision_helper[activity_id].done? open_request = @decision_helper.scheduled_activities.delete(activity_id) exception = CancellationException.new("Cancelled from ActivityTaskCanceledEvent", nil) if ! open_request.nil? open_request.completion_handle.fail(exception) end end end |
#handle_activity_task_completed(event) ⇒ Object
A handler for the ‘ActivityClassCompleted` event.
220 221 222 223 224 225 226 227 228 229 |
# File 'lib/aws/decider/activity.rb', line 220 def handle_activity_task_completed(event) scheduled_id = event.attributes[:scheduled_event_id] activity_id = @decision_helper.activity_scheduling_event_id_to_activity_id[scheduled_id] @decision_helper[activity_id].consume(:handle_completion_event) if @decision_helper[activity_id].done? open_request = @decision_helper.scheduled_activities.delete(activity_id) open_request.result = event.attributes[:result] open_request.completion_handle.complete end end |
#handle_activity_task_failed(event) ⇒ Object
A handler for the ‘ActivityTaskFailed` event.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/aws/decider/activity.rb', line 181 def handle_activity_task_failed(event) attributes = event.attributes activity_id = @decision_helper.get_activity_id(attributes[:scheduled_event_id]) @decision_helper[activity_id].consume(:handle_completion_event) open_request_info = @decision_helper.scheduled_activities.delete(activity_id) reason = attributes[:reason] if attributes.keys.include? :reason reason ||= "The activity which failed did not provide a reason" details = attributes[:details] if attributes.keys.include? :details details ||= "The activity which failed did not provide details" # TODO consider adding user_context to open request, and adding it here # @decision_helper[@decision_helper.activity_scheduling_event_id_to_activity_id[event.attributes.scheduled_event_id]].attributes[:options].data_converter failure = ActivityTaskFailedException.new(event.id, activity_id, reason, details) open_request_info.completion_handle.fail(failure) end |
#handle_activity_task_timed_out(event) ⇒ Object
A handler for the ‘ActivityClassTimedOut` event.
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/aws/decider/activity.rb', line 162 def handle_activity_task_timed_out(event) activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id]) activity_state_machine = @decision_helper[activity_id] activity_state_machine.consume(:handle_completion_event) if activity_state_machine.done? open_request = @decision_helper.scheduled_activities.delete(activity_id) if ! open_request.nil? timeout_type = event.attributes[:timeout_type] failure = ActivityTaskTimedOutException.new(event.id, activity_id, timeout_type, "Time out") open_request.completion_handle.fail(failure) end end end |
#handle_schedule_activity_task_failed(event) ⇒ Object
A handler for the ‘ScheduleActivityTaskFailed` event.
202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/aws/decider/activity.rb', line 202 def handle_schedule_activity_task_failed(event) attributes = event.attributes activity_id = attributes[:activity_id] open_request_info = @decision_helper.scheduled_activities.delete(activity_id) activity_state_machine = @decision_helper[activity_id] activity_state_machine.consume(:handle_initiation_failed_event) if activity_state_machine.done? # TODO Fail task correctly failure = ScheduleActivityTaskFailedException.new(event.id, event.attributes.activity_type, activity_id, event.attributes.cause) open_request_info.completion_handle.fail(failure) end end |
#method_to_retry_alias(method_name) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
123 124 125 |
# File 'lib/aws/decider/activity.rb', line 123 def method_to_retry_alias(method_name) "#{__method_name.to_s + "_retry"}".to_sym end |
#request_cancel_activity_task(to_cancel) ⇒ Object
Requests that the activity is canceled.
132 133 134 135 136 137 138 |
# File 'lib/aws/decider/activity.rb', line 132 def request_cancel_activity_task(to_cancel) = to_cancel. if ! .respond_to? :activity_id raise "You need to use a future obtained from an activity" end @decision_helper[.activity_id].consume(:cancel) end |
#retry_alias_to_method(retry_alias) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
118 119 120 |
# File 'lib/aws/decider/activity.rb', line 118 def retry_alias_to_method(retry_alias) retry_alias.to_s[/__(.*)_retry/, 1].to_sym end |
#schedule_activity(name, activity_type, input, options) ⇒ Object
Schedules a named activity.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/aws/decider/activity.rb', line 245 def schedule_activity(name, activity_type, input, ) = Utilities::(@option_map[activity_name_from_activity_type(name)], ) = ActivityOptions.new() output = Utilities::AddressableFuture.new open_request = OpenRequestInfo.new decision_id = @decision_helper.get_next_id(:Activity) output. = ActivityMetadata.new(decision_id) error_handler do |t| t.begin do @data_converter = .data_converter #input = input.map { |input_part| @data_converter.dump input_part } unless input.nil? input = @data_converter.dump input unless input.empty? attributes = {} .input ||= input unless input.empty? attributes[:options] = attributes[:activity_type] = activity_type attributes[:decision_id] = decision_id @completion_handle = nil external_task do |t| t.initiate_task do |handle| open_request.completion_handle = handle @decision_helper.scheduled_activities[decision_id.to_s] = open_request @decision_helper[decision_id.to_s] = ActivityDecisionStateMachine.new(decision_id, attributes) end t.cancellation_handler do |this_handle, cause| state_machine = @decision_helper[decision_id.to_s] if state_machine.current_state == :created open_request = @decision_helper.scheduled_activities.delete(decision_id.to_s) open_request.completion_handle.complete end state_machine.consume(:cancel) end end end t.rescue(Exception) do |error| @data_converter = .data_converter # If we have an ActivityTaskFailedException, then we should figure # out what the cause was, and pull that out. If it's anything else, # we should serialize the error, and stuff that into details, so # that things above us can pull it out correctly. We don't have to # do this for ActivityTaskFailedException, as the details is # *already* serialized. if error.is_a? ActivityTaskFailedException details = @data_converter.load(error.details) error.cause = details else details = @data_converter.dump(error) error.details = details end @failure_map[decision_id.to_s] = error end t.ensure do @data_converter = .data_converter result = @data_converter.load open_request.result output.set(result) raise @failure_map[decision_id.to_s] if @failure_map[decision_id.to_s] && .return_on_start end end return output if .return_on_start output.get this_failure = @failure_map[decision_id.to_s] raise this_failure if this_failure return output.get end |