Class: Glider::Component
- Inherits:
-
Object
- Object
- Glider::Component
- Defined in:
- lib/glider/component.rb,
lib/glider/workflows.rb,
lib/glider/activities.rb
Instance Attribute Summary collapse
-
#event ⇒ Object
readonly
Returns the value of attribute event.
-
#task ⇒ Object
readonly
Returns the value of attribute task.
Class Method Summary collapse
- .activities ⇒ Object
-
.activity_name_for(task, event) ⇒ Object
used for timeouts and activity task completed.
-
.domain(domain_name = nil, retention_period: 10) ⇒ Object
both setter and getter.
- .graceful_exit ⇒ Object
-
.has_previous_decisions?(workflow_execution) ⇒ Boolean
let’s us determine if :decised_task_started should be called :workflow_execution_started.
- .loop_block_for_activity(activity_type) ⇒ Object
- .loop_block_for_workflow(workflow_type) ⇒ Object
- .process_decision_task(workflow_type, task) ⇒ Object
- .process_input(input) ⇒ Object
- .register_activity(name, version, options = {}) ⇒ Object
- .register_workflow(name, version, options = {}) ⇒ Object
- .should_call_workflow_target?(event_name, workflow_execution) ⇒ Boolean
- .signal_handling ⇒ Object
- .swf ⇒ Object
- .task_lock! ⇒ Object
-
.workers(workers_count = nil) ⇒ Object
both setter and getter.
- .workflow_data_for(event_name, event) ⇒ Object
- .workflows ⇒ Object
Instance Method Summary collapse
- #activity(name, version) ⇒ Object
-
#initialize(task, event = nil) ⇒ Component
constructor
A new instance of Component.
Constructor Details
#initialize(task, event = nil) ⇒ Component
Returns a new instance of Component.
9 10 11 12 |
# File 'lib/glider/component.rb', line 9 def initialize(task, event=nil) @task = task @event = event end |
Instance Attribute Details
#event ⇒ Object (readonly)
Returns the value of attribute event.
7 8 9 |
# File 'lib/glider/component.rb', line 7 def event @event end |
#task ⇒ Object (readonly)
Returns the value of attribute task.
7 8 9 |
# File 'lib/glider/component.rb', line 7 def task @task end |
Class Method Details
.activities ⇒ Object
8 9 10 |
# File 'lib/glider/activities.rb', line 8 def activities @activities ||= [] end |
.activity_name_for(task, event) ⇒ Object
used for timeouts and activity task completed
93 94 95 96 97 98 99 100 |
# File 'lib/glider/workflows.rb', line 93 def activity_name_for(task, event) # taken from SimplerWorkflow completed_event = task.workflow_execution.events.reverse_order.find do |e| e.id == event.attributes.scheduled_event_id end activity_name = completed_event.attributes.activity_type.name inflected_name = ActiveSupport::Inflector.underscore activity_name end |
.domain(domain_name = nil, retention_period: 10) ⇒ Object
both setter and getter
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/glider/component.rb', line 53 def domain(domain_name=nil, retention_period: 10) if domain_name begin @domain = swf.domains[domain_name.to_s] @domain.status rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e # create it if necessary @domain = swf.domains.create(domain_name.to_s, retention_period) end else @domain end end |
.graceful_exit ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/glider/component.rb', line 29 def graceful_exit if @in_task @time_to_exit = true else Process.exit! 0 end end |
.has_previous_decisions?(workflow_execution) ⇒ Boolean
let’s us determine if :decised_task_started should be called :workflow_execution_started
37 38 39 40 41 42 43 |
# File 'lib/glider/workflows.rb', line 37 def has_previous_decisions?(workflow_execution) workflow_execution.history_events.each do |event| event_type = ActiveSupport::Inflector.underscore(event.event_type).to_sym return true if event_type == :decision_task_completed end return false end |
.loop_block_for_activity(activity_type) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/glider/activities.rb', line 50 def loop_block_for_activity(activity_type) Proc.new do $0 = "ruby #{activity_type.name}-#{activity_type.version}" signal_handling Glider.logger.info "Startig worker for #{activity_type.name} activity (pid #{Process.pid})" loop do begin domain.activity_tasks.poll activity_type.name do |activity_task| task_lock! do begin workflow_id = activity_task.workflow_execution.workflow_id Glider.logger.info "Executing activity=#{activity_type.name} workflow_id=#{workflow_id}" target_instance = self.new activity_task input = process_input(activity_task.input) activity_result = target_instance.send activity_type.name, input activity_task.complete! result: activity_result.to_s unless activity_task.responded? rescue AWS::SimpleWorkflow::ActivityTask::CancelRequestedError # cleanup after ourselves activity_task.cancel! end end end rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault $logger.error "An action relating to an expired workflow was sent. Probably the activity took longer than the execution timeout span." rescue RuntimeError => e if e.to_s == "already responded" # this error sometimes appear if failing and completing happen very close in time and SWF doesn't report correctly the responded? status Glider.logger.warn "Ignoring error responding to activity task failed. Most likely caused because your task failed the activity already." else raise e end end end end end |
.loop_block_for_workflow(workflow_type) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/glider/workflows.rb', line 136 def loop_block_for_workflow(workflow_type) Proc.new do $0 = "ruby #{workflow_type.name}-#{workflow_type.version}" signal_handling Glider.logger.info "Startig worker for #{workflow_type.name} (pid #{Process.pid})" loop do begin domain.decision_tasks.poll workflow_type.name do |decision_task| task_lock! do process_decision_task workflow_type, decision_task end end rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault $logger.error "An action relating to an expired decision was sent. Probably the decider took longer than the decision timeout span." end end end end |
.process_decision_task(workflow_type, task) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/glider/workflows.rb', line 102 def process_decision_task(workflow_type, task) workflow_id = task.workflow_execution.workflow_id task.new_events.each do |event| event_name = ActiveSupport::Inflector.underscore(event.event_type).to_sym if should_call_workflow_target? event_name, task.workflow_execution target_instance = self.new task, event data = workflow_data_for(event_name, event) # convert signals to event names! case event_name when :workflow_execution_signaled event_name = "#{event.attributes.signal_name}_signal".to_sym when :activity_task_completed event_name = "#{activity_name_for(task, event)}_activity_completed".to_sym when :activity_task_failed event_name = "#{activity_name_for(task, event)}_activity_failed".to_sym when :activity_task_timed_out event_name = "#{activity_name_for(task, event)}_activity_timed_out".to_sym end Glider.logger.info "event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{workflow_id}" target_instance.send workflow_type.name, event_name, event, data # ensure proper response was given (aka a decision taken) decisions = task.instance_eval {@decisions} Glider.logger.debug decisions if decisions.length == 0 && !task.responded? # the decider didn't add any decision Glider.logger.warn "No decision was made event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{task.workflow_execution.workflow_id}" end end end end |
.process_input(input) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/glider/activities.rb', line 35 def process_input(input) if input.nil? nil else # try to parse input as json begin input = ActiveSupport::HashWithIndifferentAccess.new JSON.parse(input) rescue JSON::ParserError input end end end |
.register_activity(name, version, options = {}) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/glider/activities.rb', line 12 def register_activity(name, version, ={}) = { :default_task_list => name.to_s, :default_task_schedule_to_start_timeout => :none, :default_task_start_to_close_timeout => 60, :default_task_schedule_to_close_timeout => :none, :default_task_heartbeat_timeout => :none } = .merge begin activity_type = domain.activity_types.create name.to_s, version.to_s, rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault # already registered activity_type = domain.activity_types[name.to_s, version.to_s] end workers.times do ProcessManager.register_worker loop_block_for_activity(activity_type) end end |
.register_workflow(name, version, options = {}) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/glider/workflows.rb', line 16 def register_workflow(name, version, ={}) = { :default_task_list => name.to_s, :default_child_policy => :request_cancel, :default_task_start_to_close_timeout => 10, # decider timeout :default_execution_start_to_close_timeout => 120 } = .merge begin workflow_type = domain.workflow_types.create name.to_s, version.to_s, rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault # already registered workflow_type = domain.workflow_types[name.to_s, version.to_s] end workers.times do ProcessManager.register_worker loop_block_for_workflow(workflow_type) end end |
.should_call_workflow_target?(event_name, workflow_execution) ⇒ Boolean
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/glider/workflows.rb', line 45 def should_call_workflow_target?(event_name, workflow_execution) case event_name when :activity_task_scheduled, :activity_task_started, :decision_task_scheduled, :decision_task_started, :decision_task_completed, :marker_recorded, :timer_started, :start_child_workflow_execution_initiated, :start_child_workflow_execution_started, :signal_external_workflow_execution_initiated, :request_cancel_external_workflow_execution_initiated Glider.logger.debug "Skipping decider call event=#{event_name} workflow_id=#{workflow_execution.workflow_id}" return false else return true end end |
.signal_handling ⇒ Object
37 38 39 40 41 |
# File 'lib/glider/component.rb', line 37 def signal_handling Signal.trap('USR1') do graceful_exit end end |
.swf ⇒ Object
43 44 45 |
# File 'lib/glider/component.rb', line 43 def swf @swf ||= AWS::SimpleWorkflow.new end |
.task_lock! ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/glider/component.rb', line 21 def task_lock! @in_task = true yield ensure @in_task = false Process.exit! 0 if @time_to_exit # in case an exit signal was received during task processing end |
.workers(workers_count = nil) ⇒ Object
both setter and getter
48 49 50 |
# File 'lib/glider/component.rb', line 48 def workers(workers_count=nil) workers_count ? @workers = workers_count : @workers ||= 1 end |
.workflow_data_for(event_name, event) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/glider/workflows.rb', line 66 def workflow_data_for(event_name, event) data = case event_name when :workflow_execution_started #:decision_task_scheduled event.attributes.input when :workflow_execution_signaled begin event.attributes.input rescue nil end when :activity_task_completed begin event.attributes.result rescue nil end else begin event.attributes.result rescue Glider.logger.debug "no input or result in event, data will be nil event=#{event_name} attributes=#{event.attributes.to_h}" nil end end return data if data.nil? # try to parse as JSON begin ActiveSupport::HashWithIndifferentAccess.new JSON.parse(data) rescue JSON::ParserError data end end |
.workflows ⇒ Object
12 13 14 |
# File 'lib/glider/workflows.rb', line 12 def workflows @workflows ||= [] end |
Instance Method Details
#activity(name, version) ⇒ Object
14 15 16 |
# File 'lib/glider/component.rb', line 14 def activity(name, version) {name: name.to_s, version: version.to_s} end |