Class: Ruote::StorageParticipant

Inherits:
Object
  • Object
show all
Includes:
Enumerable, LocalParticipant
Defined in:
lib/ruote/part/storage_participant.rb

Overview

A participant that stores the workitem in the same storage used by the engine and the worker(s).

part = engine.register_participant 'alfred', Ruote::StorageParticipant

# ... a bit later

puts "workitems still open : "
part.each do |workitem|
  puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end

# ... when done with a workitem

part.reply(workitem)
  # this will remove the workitem from the storage and hand it back
  # to the engine

Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).

Instance Attribute Summary

Attributes included from LocalParticipant

#context

Class Method Summary collapse

Instance Method Summary collapse

Methods included from LocalParticipant

#re_dispatch, #unschedule_re_dispatch

Methods included from ReceiverMixin

#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply_to_engine, #sign

Constructor Details

#initialize(engine_or_options = {}, options = nil) ⇒ StorageParticipant

Returns a new instance of StorageParticipant.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/ruote/part/storage_participant.rb', line 58

def initialize(engine_or_options={}, options=nil)

  if engine_or_options.respond_to?(:context)
    @context = engine_or_options.context
  elsif engine_or_options.is_a?(Ruote::Context)
    @context = engine_or_options
  else
    @options = engine_or_options
  end

  @options ||= {}

  @store_name = @options['store_name']
end

Class Method Details

.matches?(hwi, pname, criteria) ⇒ Boolean

Used by #query when filtering workitems.

Returns:

  • (Boolean)


294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/ruote/part/storage_participant.rb', line 294

def self.matches?(hwi, pname, criteria)

  return false if pname && hwi['participant_name'] != pname

  fields = hwi['fields']

  criteria.each do |fname, fvalue|
    return false if fields[fname] != fvalue
  end

  true
end

Instance Method Details

#[](fei) ⇒ Object Also known as: by_fei

Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).



115
116
117
118
119
120
# File 'lib/ruote/part/storage_participant.rb', line 115

def [](fei)

  doc = fetch(fei)

  doc ? Ruote::Workitem.new(doc) : nil
end

#all(opts = {}) ⇒ Object

Returns all the workitems stored in here.



172
173
174
175
176
177
# File 'lib/ruote/part/storage_participant.rb', line 172

def all(opts={})

  res = fetch_all(opts)

  res.is_a?(Array) ? res.map { |hwi| Ruote::Workitem.new(hwi) } : res
end

#by_field(field, value = nil, opts = {}) ⇒ Object

field : returns all the workitems with the given field name present.

field and value : returns all the workitems with the given field name and the given value for that field.

Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.



220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/ruote/part/storage_participant.rb', line 220

def by_field(field, value=nil, opts={})

  (value, opts = nil, value) if value.is_a?(Hash)

  if @context.storage.respond_to?(:by_field)
    return @context.storage.by_field('workitems', field, value, opts)
  end

  do_select(opts) do |hwi|
    hwi['fields'].keys.include?(field) &&
    (value.nil? || hwi['fields'][field] == value)
  end
end

#by_participant(participant_name, opts = {}) ⇒ Object

Returns all workitems for the specified participant name



200
201
202
203
204
205
206
207
208
209
# File 'lib/ruote/part/storage_participant.rb', line 200

def by_participant(participant_name, opts={})

  return @context.storage.by_participant(
    'workitems', participant_name, opts
  ) if @context.storage.respond_to?(:by_participant)

  do_select(opts) do |hwi|
    hwi['participant_name'] == participant_name
  end
end

#by_wfid(wfid, opts = {}) ⇒ Object

Return all workitems for the specified wfid



189
190
191
192
193
194
195
196
# File 'lib/ruote/part/storage_participant.rb', line 189

def by_wfid(wfid, opts={})

  if @context.storage.respond_to?(:by_wfid)
    return @context.storage.by_wfid('workitems', wfid, opts)
  end

  wis(@context.storage.get_many('workitems', wfid, opts))
end

#cancel(fei, flavour) ⇒ Object

Removes the document/workitem from the storage



103
104
105
106
107
108
109
110
# File 'lib/ruote/part/storage_participant.rb', line 103

def cancel(fei, flavour)

  doc = fetch(fei)

  r = @context.storage.delete(doc)

  cancel(fei, flavour) if r != nil
end

#consume(workitem) ⇒ Object Also known as: update

This is the method called by ruote when passing a workitem to this participant.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/ruote/part/storage_participant.rb', line 80

def consume(workitem)

  doc = workitem.to_h

  doc.merge!(
    'type' => 'workitems',
    '_id' => to_id(doc['fei']),
    'participant_name' => doc['participant_name'],
    'wfid' => doc['fei']['wfid'])

  doc['store_name'] = @store_name if @store_name

  @context.storage.put(doc)
end

#do_not_threadObject

No need for a separate thread when delivering to this participant.



75
# File 'lib/ruote/part/storage_participant.rb', line 75

def do_not_thread; true; end

#each(&block) ⇒ Object

Iterates over the workitems stored in here.



165
166
167
168
# File 'lib/ruote/part/storage_participant.rb', line 165

def each(&block)

  all.each { |wi| block.call(wi) }
end

#firstObject

A convenience method (especially when testing), returns the first (only ?) workitem in the participant.



182
183
184
185
# File 'lib/ruote/part/storage_participant.rb', line 182

def first

  wi(fetch_all({}).first)
end

#per_participantObject

Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.



310
311
312
313
# File 'lib/ruote/part/storage_participant.rb', line 310

def per_participant

  inject({}) { |h, wi| (h[wi.participant_name] ||= []) << wi; h }
end

#per_participant_countObject

Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.



319
320
321
322
# File 'lib/ruote/part/storage_participant.rb', line 319

def per_participant_count

  per_participant.inject({}) { |h, (k, v)| h[k] = v.size; h }
end

#proceed(workitem) ⇒ Object

Removes the workitem from the storage and replies to the engine.

TODO : should it raise if the workitem can’t be found ? TODO : should it accept just the fei ?

Raises:

  • (ArgumentError)


129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/ruote/part/storage_participant.rb', line 129

def proceed(workitem)

  doc = fetch(Ruote::FlowExpressionId.extract_h(workitem))

  r = @context.storage.delete(doc)

  raise ArgumentError.new('cannot proceed, workitem is gone') if r == true
  return proceed(workitem) if r != nil

  workitem.h.delete('_rev')

  reply_to_engine(workitem)
end

#purge!Object

Cleans this participant out completely



287
288
289
290
# File 'lib/ruote/part/storage_participant.rb', line 287

def purge!

  fetch_all({}).each { |hwi| @context.storage.delete(hwi) }
end

#query(criteria) ⇒ Object

Queries the store participant for workitems.

Some examples :

part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size

There are two ‘reserved’ criterion : ‘wfid’ and ‘participant’ (‘participant_name’ as well). The rest of the criteria are considered constraints for fields.

‘offset’ and ‘limit’ are reserved as well. They should prove useful for pagination. ‘skip’ can be used instead of ‘offset’.

Note : the criteria is AND only, you’ll have to do ORs (aggregation) by yourself.



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/ruote/part/storage_participant.rb', line 253

def query(criteria)

  cr = Ruote.keys_to_s(criteria)

  if @context.storage.respond_to?(:query_workitems)
    return @context.storage.query_workitems(cr)
  end

  opts = {}
  opts[:skip] = cr.delete('offset') || cr.delete('skip')
  opts[:limit] = cr.delete('limit')
  opts[:count] = cr.delete('count')

  wfid = cr.delete('wfid')

  count = opts[:count]

  pname = cr.delete('participant_name') || cr.delete('participant')
  opts.delete(:count) if pname

  hwis = wfid ?
    @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts)

  return hwis unless hwis.is_a?(Array)

  hwis = hwis.select { |hwi|
    Ruote::StorageParticipant.matches?(hwi, pname, cr)
  }

  count ? hwis.size : wis(hwis)
end

#reply(workitem) ⇒ Object

(soon to be removed)



145
146
147
148
149
150
151
152
153
154
# File 'lib/ruote/part/storage_participant.rb', line 145

def reply(workitem)

  puts '-' * 80
  puts '*** WARNING : please use the Ruote::StorageParticipant#proceed(wi)'
  puts '              instead of #reply(wi) which is deprecated'
  #caller.each { |l| puts l }
  puts '-' * 80

  proceed(workitem)
end

#sizeObject

Returns the count of workitems stored in this participant.



158
159
160
161
# File 'lib/ruote/part/storage_participant.rb', line 158

def size

  fetch_all(:count => true)
end