Class: Glider::Component

Inherits:
Object
  • Object
show all
Defined in:
lib/glider/component.rb,
lib/glider/workflows.rb,
lib/glider/activities.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, event = nil) ⇒ Component

Returns a new instance of Component.



9
10
11
12
# File 'lib/glider/component.rb', line 9

def initialize(task, event=nil)
	@task = task
	@event = event
end

Instance Attribute Details

#eventObject (readonly)

Returns the value of attribute event.



7
8
9
# File 'lib/glider/component.rb', line 7

def event
  @event
end

#taskObject (readonly)

Returns the value of attribute task.



7
8
9
# File 'lib/glider/component.rb', line 7

def task
  @task
end

Class Method Details

.activitiesObject



8
9
10
# File 'lib/glider/activities.rb', line 8

def activities
	@activities ||= []
end

.activity_name_for(task, event) ⇒ Object

used for timeouts and activity task completed



93
94
95
96
97
98
99
100
# File 'lib/glider/workflows.rb', line 93

def activity_name_for(task, event)
	# taken from SimplerWorkflow
		completed_event = task.workflow_execution.events.reverse_order.find do |e| 
			e.id == event.attributes.scheduled_event_id
		end
		activity_name = completed_event.attributes.activity_type.name
	inflected_name = ActiveSupport::Inflector.underscore activity_name
end

.domain(domain_name = nil, retention_period: 10) ⇒ Object

both setter and getter



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/glider/component.rb', line 53

def domain(domain_name=nil, retention_period: 10)
	if domain_name
		begin
			@domain = swf.domains[domain_name.to_s]
			@domain.status
		rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e
			# create it if necessary
			@domain = swf.domains.create(domain_name.to_s, retention_period)
		end
	else
		@domain
	end
end

.graceful_exitObject



29
30
31
32
33
34
35
# File 'lib/glider/component.rb', line 29

def graceful_exit
	if @in_task
		@time_to_exit = true
	else
		Process.exit! 0
	end
end

.has_previous_decisions?(workflow_execution) ⇒ Boolean

let’s us determine if :decised_task_started should be called :workflow_execution_started

Returns:

  • (Boolean)


37
38
39
40
41
42
43
# File 'lib/glider/workflows.rb', line 37

def has_previous_decisions?(workflow_execution)
        workflow_execution.history_events.each do |event|
                event_type = ActiveSupport::Inflector.underscore(event.event_type).to_sym
                return true if event_type == :decision_task_completed
        end
        return false
end

.loop_block_for_activity(activity_type) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/glider/activities.rb', line 50

def loop_block_for_activity(activity_type)
	Proc.new do
		$0 = "ruby #{activity_type.name}-#{activity_type.version}"
		signal_handling
		Glider.logger.info "Startig worker for #{activity_type.name} activity (pid #{Process.pid})"
		loop do
			begin
				domain.activity_tasks.poll activity_type.name do |activity_task|
					task_lock! do
						begin
							workflow_id = activity_task.workflow_execution.workflow_id
							Glider.logger.info "Executing activity=#{activity_type.name} workflow_id=#{workflow_id}"
							target_instance = self.new activity_task
							input = process_input(activity_task.input)
							activity_result = target_instance.send activity_type.name, input
							 
							activity_task.complete! result: activity_result.to_s unless activity_task.responded?

						rescue AWS::SimpleWorkflow::ActivityTask::CancelRequestedError
							# cleanup after ourselves
							activity_task.cancel!
						end
					end
				end
			rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault
				$logger.error "An action relating to an expired workflow was sent. Probably the activity took longer than the execution timeout span."
			rescue RuntimeError => e
				if e.to_s == "already responded"
					# this error sometimes appear if failing and completing happen very close in time and SWF doesn't report correctly the responded? status
					Glider.logger.warn "Ignoring error responding to activity task failed. Most likely caused because your task failed the activity already."
				else
					raise e
				end
			end
		end
	end
end

.loop_block_for_workflow(workflow_type) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/glider/workflows.rb', line 136

def loop_block_for_workflow(workflow_type)
	Proc.new do
		$0 = "ruby #{workflow_type.name}-#{workflow_type.version}"
		signal_handling
		Glider.logger.info "Startig worker for #{workflow_type.name} (pid #{Process.pid})"
		loop do
			begin
				domain.decision_tasks.poll workflow_type.name do |decision_task|
					task_lock! do
						process_decision_task workflow_type, decision_task
					end
				end
			rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault
				$logger.error "An action relating to an expired decision was sent. Probably the decider took longer than the decision timeout span."
			end
		end
	end
end

.process_decision_task(workflow_type, task) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/glider/workflows.rb', line 102

def process_decision_task(workflow_type, task)
	workflow_id = task.workflow_execution.workflow_id
	task.new_events.each do |event| 
		event_name = ActiveSupport::Inflector.underscore(event.event_type).to_sym
		if should_call_workflow_target? event_name, task.workflow_execution
		 	target_instance = self.new task, event
		 	data = workflow_data_for(event_name, event)
		 	# convert signals to event names!
		 	case event_name
		 	when :workflow_execution_signaled
		 		event_name = "#{event.attributes.signal_name}_signal".to_sym
		 	when :activity_task_completed
		 		event_name = "#{activity_name_for(task, event)}_activity_completed".to_sym
		 	when :activity_task_failed
		 		event_name = "#{activity_name_for(task, event)}_activity_failed".to_sym
		 	when :activity_task_timed_out
		 		event_name = "#{activity_name_for(task, event)}_activity_timed_out".to_sym
			end
			
			Glider.logger.info "event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{workflow_id}"
			target_instance.send workflow_type.name, event_name, event, data
			

			# ensure proper response was given (aka a decision taken)
			decisions = task.instance_eval {@decisions}
			Glider.logger.debug decisions
			if decisions.length == 0 && !task.responded?
				# the decider didn't add any decision
				Glider.logger.warn "No decision was made event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{task.workflow_execution.workflow_id}"
			end
		end
	end
end

.process_input(input) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/glider/activities.rb', line 35

def process_input(input)
	if input.nil?
		nil
	else
		# try to parse input as json
		begin
			input = ActiveSupport::HashWithIndifferentAccess.new JSON.parse(input)
		rescue JSON::ParserError
			input
		end
	end
end

.register_activity(name, version, options = {}) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/glider/activities.rb', line 12

def register_activity(name, version, options={})
	default_options = {
		:default_task_list => name.to_s,
		:default_task_schedule_to_start_timeout => :none,
		:default_task_start_to_close_timeout => 60,
		:default_task_schedule_to_close_timeout => :none,
		:default_task_heartbeat_timeout => :none

	}

	options = default_options.merge options

	begin
		activity_type = domain.activity_types.create name.to_s, version.to_s, options
	rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault
		# already registered
		activity_type = domain.activity_types[name.to_s, version.to_s]
	end
	workers.times do 
		ProcessManager.register_worker loop_block_for_activity(activity_type)
	end
end

.register_workflow(name, version, options = {}) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/glider/workflows.rb', line 16

def register_workflow(name, version, options={})

	default_options = {
		:default_task_list => name.to_s,
		:default_child_policy => :request_cancel,
		:default_task_start_to_close_timeout => 10, # decider timeout
		:default_execution_start_to_close_timeout => 120
	}
	options = default_options.merge options
	begin
		workflow_type = domain.workflow_types.create name.to_s, version.to_s, options
	rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault
		# already registered
		workflow_type = domain.workflow_types[name.to_s, version.to_s]
	end
	workers.times do 
		ProcessManager.register_worker loop_block_for_workflow(workflow_type)
	end
end

.should_call_workflow_target?(event_name, workflow_execution) ⇒ Boolean

Returns:

  • (Boolean)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/glider/workflows.rb', line 45

def should_call_workflow_target?(event_name, workflow_execution)
	case event_name
	when 	:activity_task_scheduled,
			:activity_task_started,
			:decision_task_scheduled,
			:decision_task_started,
			:decision_task_completed,
			:marker_recorded,
			:timer_started,
			:start_child_workflow_execution_initiated,
			:start_child_workflow_execution_started,
			:signal_external_workflow_execution_initiated,
			:request_cancel_external_workflow_execution_initiated

		Glider.logger.debug "Skipping decider call event=#{event_name} workflow_id=#{workflow_execution.workflow_id}"
		return false
	else
		return true
	end
end

.signal_handlingObject



37
38
39
40
41
# File 'lib/glider/component.rb', line 37

def signal_handling
	Signal.trap('USR1') do
		graceful_exit
	end
end

.swfObject



43
44
45
# File 'lib/glider/component.rb', line 43

def swf
	@swf ||= AWS::SimpleWorkflow.new
end

.task_lock!Object



21
22
23
24
25
26
27
# File 'lib/glider/component.rb', line 21

def task_lock!
	@in_task = true
	yield
ensure
	@in_task = false
	Process.exit! 0 if @time_to_exit # in case an exit signal was received during task processing
end

.workers(workers_count = nil) ⇒ Object

both setter and getter



48
49
50
# File 'lib/glider/component.rb', line 48

def workers(workers_count=nil)
	workers_count ? @workers = workers_count : @workers ||= 1
end

.workflow_data_for(event_name, event) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/glider/workflows.rb', line 66

def workflow_data_for(event_name, event)
	data = 	case event_name
			when :workflow_execution_started #:decision_task_scheduled
				event.attributes.input
			when :workflow_execution_signaled
				begin event.attributes.input rescue nil end
			when :activity_task_completed
				begin event.attributes.result rescue nil end
			else
				begin 
					event.attributes.result
				rescue
					Glider.logger.debug "no input or result in event, data will be nil event=#{event_name} attributes=#{event.attributes.to_h}"
					nil
				end
			end 
	return data if data.nil?
	# try to parse as JSON
	begin

		ActiveSupport::HashWithIndifferentAccess.new JSON.parse(data)
	rescue JSON::ParserError
		data
	end
end

.workflowsObject



12
13
14
# File 'lib/glider/workflows.rb', line 12

def workflows
	@workflows ||= []
end

Instance Method Details

#activity(name, version) ⇒ Object



14
15
16
# File 'lib/glider/component.rb', line 14

def activity(name, version)
	{name: name.to_s, version: version.to_s}
end