Class: Ruote::StorageParticipant
- Inherits:
-
Object
- Object
- Ruote::StorageParticipant
- Includes:
- Enumerable, LocalParticipant
- Defined in:
- lib/ruote/part/storage_participant.rb
Overview
A participant that stores the workitem in the same storage used by the engine and the worker(s).
part = engine.register_participant 'alfred', Ruote::StorageParticipant
# ... a bit later
puts "workitems still open : "
part.each do |workitem|
puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end
# ... when done with a workitem
part.reply(workitem)
# this will remove the workitem from the storage and hand it back
# to the engine
Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).
Instance Attribute Summary
Attributes included from LocalParticipant
Class Method Summary collapse
-
.matches?(hwi, pname, criteria) ⇒ Boolean
Used by #query when filtering workitems.
Instance Method Summary collapse
-
#[](fei) ⇒ Object
(also: #by_fei)
Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).
-
#all(opts = {}) ⇒ Object
Returns all the workitems stored in here.
-
#by_field(field, value = nil, opts = {}) ⇒ Object
field : returns all the workitems with the given field name present.
-
#by_participant(participant_name, opts = {}) ⇒ Object
Returns all workitems for the specified participant name.
-
#by_wfid(wfid, opts = {}) ⇒ Object
Return all workitems for the specified wfid.
-
#cancel(fei, flavour) ⇒ Object
Removes the document/workitem from the storage.
-
#consume(workitem) ⇒ Object
(also: #update)
This is the method called by ruote when passing a workitem to this participant.
-
#do_not_thread ⇒ Object
No need for a separate thread when delivering to this participant.
-
#each(&block) ⇒ Object
Iterates over the workitems stored in here.
-
#first ⇒ Object
A convenience method (especially when testing), returns the first (only ?) workitem in the participant.
-
#initialize(engine_or_options = {}, options = nil) ⇒ StorageParticipant
constructor
A new instance of StorageParticipant.
-
#per_participant ⇒ Object
Mostly a test method.
-
#per_participant_count ⇒ Object
Mostly a test method.
-
#proceed(workitem) ⇒ Object
Removes the workitem from the storage and replies to the engine.
-
#purge! ⇒ Object
Cleans this participant out completely.
-
#query(criteria) ⇒ Object
Queries the store participant for workitems.
-
#reply(workitem) ⇒ Object
(soon to be removed).
-
#size ⇒ Object
Returns the count of workitems stored in this participant.
Methods included from LocalParticipant
#re_dispatch, #unschedule_re_dispatch
Methods included from ReceiverMixin
#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply_to_engine, #sign
Constructor Details
#initialize(engine_or_options = {}, options = nil) ⇒ StorageParticipant
Returns a new instance of StorageParticipant.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/ruote/part/storage_participant.rb', line 58 def initialize(={}, =nil) if .respond_to?(:context) @context = .context elsif .is_a?(Ruote::Context) @context = else @options = end @options ||= {} @store_name = @options['store_name'] end |
Class Method Details
.matches?(hwi, pname, criteria) ⇒ Boolean
Used by #query when filtering workitems.
294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/ruote/part/storage_participant.rb', line 294 def self.matches?(hwi, pname, criteria) return false if pname && hwi['participant_name'] != pname fields = hwi['fields'] criteria.each do |fname, fvalue| return false if fields[fname] != fvalue end true end |
Instance Method Details
#[](fei) ⇒ Object Also known as: by_fei
Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).
115 116 117 118 119 120 |
# File 'lib/ruote/part/storage_participant.rb', line 115 def [](fei) doc = fetch(fei) doc ? Ruote::Workitem.new(doc) : nil end |
#all(opts = {}) ⇒ Object
Returns all the workitems stored in here.
172 173 174 175 176 177 |
# File 'lib/ruote/part/storage_participant.rb', line 172 def all(opts={}) res = fetch_all(opts) res.is_a?(Array) ? res.map { |hwi| Ruote::Workitem.new(hwi) } : res end |
#by_field(field, value = nil, opts = {}) ⇒ Object
field : returns all the workitems with the given field name present.
field and value : returns all the workitems with the given field name and the given value for that field.
Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.
220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/ruote/part/storage_participant.rb', line 220 def by_field(field, value=nil, opts={}) (value, opts = nil, value) if value.is_a?(Hash) if @context.storage.respond_to?(:by_field) return @context.storage.by_field('workitems', field, value, opts) end do_select(opts) do |hwi| hwi['fields'].keys.include?(field) && (value.nil? || hwi['fields'][field] == value) end end |
#by_participant(participant_name, opts = {}) ⇒ Object
Returns all workitems for the specified participant name
200 201 202 203 204 205 206 207 208 209 |
# File 'lib/ruote/part/storage_participant.rb', line 200 def by_participant(participant_name, opts={}) return @context.storage.by_participant( 'workitems', participant_name, opts ) if @context.storage.respond_to?(:by_participant) do_select(opts) do |hwi| hwi['participant_name'] == participant_name end end |
#by_wfid(wfid, opts = {}) ⇒ Object
Return all workitems for the specified wfid
189 190 191 192 193 194 195 196 |
# File 'lib/ruote/part/storage_participant.rb', line 189 def by_wfid(wfid, opts={}) if @context.storage.respond_to?(:by_wfid) return @context.storage.by_wfid('workitems', wfid, opts) end wis(@context.storage.get_many('workitems', wfid, opts)) end |
#cancel(fei, flavour) ⇒ Object
Removes the document/workitem from the storage
103 104 105 106 107 108 109 110 |
# File 'lib/ruote/part/storage_participant.rb', line 103 def cancel(fei, flavour) doc = fetch(fei) r = @context.storage.delete(doc) cancel(fei, flavour) if r != nil end |
#consume(workitem) ⇒ Object Also known as: update
This is the method called by ruote when passing a workitem to this participant.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/ruote/part/storage_participant.rb', line 80 def consume(workitem) doc = workitem.to_h doc.merge!( 'type' => 'workitems', '_id' => to_id(doc['fei']), 'participant_name' => doc['participant_name'], 'wfid' => doc['fei']['wfid']) doc['store_name'] = @store_name if @store_name @context.storage.put(doc) end |
#do_not_thread ⇒ Object
No need for a separate thread when delivering to this participant.
75 |
# File 'lib/ruote/part/storage_participant.rb', line 75 def do_not_thread; true; end |
#each(&block) ⇒ Object
Iterates over the workitems stored in here.
165 166 167 168 |
# File 'lib/ruote/part/storage_participant.rb', line 165 def each(&block) all.each { |wi| block.call(wi) } end |
#first ⇒ Object
A convenience method (especially when testing), returns the first (only ?) workitem in the participant.
182 183 184 185 |
# File 'lib/ruote/part/storage_participant.rb', line 182 def first wi(fetch_all({}).first) end |
#per_participant ⇒ Object
Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.
310 311 312 313 |
# File 'lib/ruote/part/storage_participant.rb', line 310 def per_participant inject({}) { |h, wi| (h[wi.participant_name] ||= []) << wi; h } end |
#per_participant_count ⇒ Object
Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.
319 320 321 322 |
# File 'lib/ruote/part/storage_participant.rb', line 319 def per_participant_count per_participant.inject({}) { |h, (k, v)| h[k] = v.size; h } end |
#proceed(workitem) ⇒ Object
Removes the workitem from the storage and replies to the engine.
TODO : should it raise if the workitem can’t be found ? TODO : should it accept just the fei ?
129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/ruote/part/storage_participant.rb', line 129 def proceed(workitem) doc = fetch(Ruote::FlowExpressionId.extract_h(workitem)) r = @context.storage.delete(doc) raise ArgumentError.new('cannot proceed, workitem is gone') if r == true return proceed(workitem) if r != nil workitem.h.delete('_rev') reply_to_engine(workitem) end |
#purge! ⇒ Object
Cleans this participant out completely
287 288 289 290 |
# File 'lib/ruote/part/storage_participant.rb', line 287 def purge! fetch_all({}).each { |hwi| @context.storage.delete(hwi) } end |
#query(criteria) ⇒ Object
Queries the store participant for workitems.
Some examples :
part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size
There are two ‘reserved’ criterion : ‘wfid’ and ‘participant’ (‘participant_name’ as well). The rest of the criteria are considered constraints for fields.
‘offset’ and ‘limit’ are reserved as well. They should prove useful for pagination. ‘skip’ can be used instead of ‘offset’.
Note : the criteria is AND only, you’ll have to do ORs (aggregation) by yourself.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/ruote/part/storage_participant.rb', line 253 def query(criteria) cr = Ruote.keys_to_s(criteria) if @context.storage.respond_to?(:query_workitems) return @context.storage.query_workitems(cr) end opts = {} opts[:skip] = cr.delete('offset') || cr.delete('skip') opts[:limit] = cr.delete('limit') opts[:count] = cr.delete('count') wfid = cr.delete('wfid') count = opts[:count] pname = cr.delete('participant_name') || cr.delete('participant') opts.delete(:count) if pname hwis = wfid ? @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts) return hwis unless hwis.is_a?(Array) hwis = hwis.select { |hwi| Ruote::StorageParticipant.matches?(hwi, pname, cr) } count ? hwis.size : wis(hwis) end |
#reply(workitem) ⇒ Object
(soon to be removed)
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/ruote/part/storage_participant.rb', line 145 def reply(workitem) puts '-' * 80 puts '*** WARNING : please use the Ruote::StorageParticipant#proceed(wi)' puts ' instead of #reply(wi) which is deprecated' #caller.each { |l| puts l } puts '-' * 80 proceed(workitem) end |
#size ⇒ Object
Returns the count of workitems stored in this participant.
158 159 160 161 |
# File 'lib/ruote/part/storage_participant.rb', line 158 def size fetch_all(:count => true) end |