Class: DaemonKit::RuoteWorkitem
- Inherits:
-
Object
- Object
- DaemonKit::RuoteWorkitem
show all
- Defined in:
- lib/daemon_kit/ruote_workitem.rb
Overview
Dual purpose class that is a) responsible for parsing incoming workitems and delegating to the correct RuotePseudoParticipant, and b) wrapping the workitem hash into something a bit more digestable.
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(workitem = {}) ⇒ RuoteWorkitem
Returns a new instance of RuoteWorkitem.
109
110
111
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 109
def initialize( workitem = {} )
@workitem = workitem
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_name, *args) ⇒ Object
178
179
180
181
182
183
184
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 178
def method_missing( method_name, *args )
if self.fields.keys.include?( method_name.to_s )
return self.fields[ method_name.to_s ]
end
super
end
|
Class Method Details
.parse(workitem) ⇒ Object
99
100
101
102
103
104
105
106
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 99
def parse( workitem )
begin
return new( JSON.parse( workitem ) )
rescue JSON::ParserError => e
DaemonKit.logger.error "No valid JSON payload found in #{workitem}"
return nil
end
end
|
.parse_command(work) ⇒ Object
Extract the class and method name from the workitem, then pick the matching class from the registered list of participants
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 72
def parse_command( work )
return nil if work['params']['command'].nil?
_, klass, method = work['params']['command'].split('/')
instance = RuoteParticipants.instance.participants[ klass ]
if instance.nil?
msg = "No instance registered for #{klass}"
DaemonKit.logger.error( msg )
raise DaemonKit::MissingParticipant, msg
end
return instance, method
end
|
.process(transport, workitem) ⇒ Object
Expects a JSON workitem from ruote that has these fields set in fields key:
{
'reply_queue' => 'queue to send replies to',
'params' => {
'command' => '/actor/method'
}
}
Notes on the command key:
It looks like a resource, and will be treated as such. Is should be in the format of /class/method
, and it will be passed the complete workitem as a hash.
Notes on replies
Replies are sent back to the queue specified in the reply_queue
key.
Notes on errors
Where daemon-kit detects errors in attempting to parse and delegate the workitems, it will reply to the engine and set the following field with the error information:
daemon_kit.error
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 39
def process( transport, workitem )
@instance ||= new
work = parse( workitem )
return if work.nil?
DaemonKit.logger.warn "Processing workitem that has timed out!" if work.timed_out?
target, method = parse_command( work )
if target.nil? || method.nil?
msg = "Missing target/method in command parameter, or command parameter missing"
DaemonKit.logger.error( msg )
work["__error__"] = msg
elsif target.public_methods.map { |m| m.to_s }.include?( method ) target.perform( method, work )
else
msg = "Workitem cannot be processes: '#{method}' not exposed by #{target.inspect}"
DaemonKit.logger.error( msg )
p [ :work, work.inspect ]
work["__error__"] = msg
end
reply_to_engine( transport, work )
end
|
.reply_to_engine(transport, response) ⇒ Object
88
89
90
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 88
def reply_to_engine( transport, response )
send( "reply_via_#{transport}", response )
end
|
.reply_via_amqp(response) ⇒ Object
92
93
94
95
96
97
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 92
def reply_via_amqp( response )
DaemonKit.logger.debug("Replying to engine via AMQP with #{response.inspect}")
::MQ.queue( response['params']['reply_queue'] ).publish( response.to_json )
response
end
|
Instance Method Details
153
154
155
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 153
def []( key )
self.fields[ key ]
end
|
#[]=(key, value) ⇒ Object
157
158
159
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 157
def []=( key, value )
self.fields[ key ] = value
end
|
#dispatch_time ⇒ Object
127
128
129
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 127
def dispatch_time
@dispath_time ||= Time.parse( @workitem['dispatch_time'] )
end
|
113
114
115
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 113
def fei
@workitem['fei']
end
|
#fields ⇒ Object
Also known as:
attributes
143
144
145
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 143
def fields
@workitem['fields'] ||= @workitem['attributes']
end
|
#has_field?(a) ⇒ Boolean
Also known as:
has_attribute?
139
140
141
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 139
def has_field?(a)
self.fields.keys.include?( a )
end
|
#last_modified ⇒ Object
131
132
133
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 131
def last_modified
@last_modified ||= Time.parse( @workitem['last_modified'] )
end
|
#participant_name ⇒ Object
135
136
137
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 135
def participant_name
@workitem['participant_name']
end
|
#short_fei ⇒ Object
117
118
119
120
121
122
123
124
125
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 117
def short_fei
@short_fei ||=
'(' + [
'fei', self.fei['owfe_version'], self.fei['engine_id'],
self.fei['workflow_definition_url'], self.fei['workflow_definition_name'],
self.fei['workflow_definition_revision'], self.fei['wfid'],
self.fei['expression_name'], self.fei['expid']
].join(' ') + ')'
end
|
#timed_out? ⇒ Boolean
Look at the workitem payload and attempt to determine if this workitem has timed out or not. This method will only ever work if you used the +:timeout: parameter was set for the expression.
168
169
170
171
172
173
174
175
176
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 168
def timed_out?
key = fei['wfid'] + '__' + fei['expid']
if self.fields["__timeouts__"] && timeout = self.fields["__timeouts__"][ key ]
return Time.at( timeout.last ) < Time.now
end
return false
end
|
161
162
163
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 161
def to_json
@workitem.to_json
end
|