Class: Ruote::Sequel::Storage

Inherits:
Object
  • Object
show all
Includes:
Ruote::StorageBase
Defined in:
lib/ruote/sequel/storage.rb

Overview

A Sequel storage implementation for ruote >= 2.2.0.

require 'rubygems'
require 'json' # gem install json
require 'ruote'
require 'ruote-sequel' # gem install ruote-sequel

sequel = Sequel.connect('postgres://localhost/ruote_test')
#sequel = Sequel.connect('mysql://root:root@localhost/ruote_test')

opts = { 'remote_definition_allowed' => true }

engine = Ruote::Engine.new(
  Ruote::Worker.new(
    Ruote::Sequel::Storage.new(sequel, opts)))

# ...

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sequel, options = {}) ⇒ Storage

Returns a new instance of Storage.



89
90
91
92
93
94
95
96
# File 'lib/ruote/sequel/storage.rb', line 89

def initialize(sequel, options={})

  @sequel = sequel
  #@options = options
  @table = (options['sequel_table_name'] || :documents).to_sym

  replace_engine_configuration(options)
end

Instance Attribute Details

#sequelObject (readonly)

The underlying Sequel::Database instance



87
88
89
# File 'lib/ruote/sequel/storage.rb', line 87

def sequel
  @sequel
end

Instance Method Details

#add_type(type) ⇒ Object

Mainly used by ruote’s test/unit/ut_17_storage.rb



250
251
252
253
# File 'lib/ruote/sequel/storage.rb', line 250

def add_type(type)

  # does nothing, types are differentiated by the 'typ' column
end

#begin_stepObject

Used by the worker to indicate a new step begins. For ruote-sequel, it means the cache can be prepared (a unique select yielding all the info necessary for one worker step (expressions excluded)).



343
344
345
346
# File 'lib/ruote/sequel/storage.rb', line 343

def begin_step

  prepare_cache
end

#by_field(type, field, value, opts = {}) ⇒ Object

Querying workitems by field (warning, goes deep into the JSON structure)

Raises:

  • (NotImplementedError)


286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/ruote/sequel/storage.rb', line 286

def by_field(type, field, value, opts={})

  raise NotImplementedError if type != 'workitems'

  lk = [ '%"', field, '":' ]
  lk.push(Rufus::Json.encode(value)) if value
  lk.push('%')

  docs = @sequel[@table].where(
    :typ => type
  ).filter(
    :doc.like(lk.join)
  )

  return docs.count if opts[:count]

  docs = docs.order(
    :ide.asc, :rev.desc
  ).limit(
    opts[:limit], opts[:offset] || opts[:skip]
  )

  select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
end

#by_participant(type, participant_name, opts = {}) ⇒ Object

A provision made for workitems, allow to query them directly by participant name.

Raises:

  • (NotImplementedError)


265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/ruote/sequel/storage.rb', line 265

def by_participant(type, participant_name, opts={})

  raise NotImplementedError if type != 'workitems'

  docs = @sequel[@table].where(
    :typ => type, :participant_name => participant_name
  )

  return docs.count if opts[:count]

  docs = docs.order(
    :ide.asc, :rev.desc
  ).limit(
    opts[:limit], opts[:offset] || opts[:skip]
  )

  select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
end

#closeObject

Grrr… I should sort the mess between close and shutdown… Tests vs production :-(



243
244
245
246
# File 'lib/ruote/sequel/storage.rb', line 243

def close

  @sequel.disconnect
end

#delete(doc) ⇒ Object

Raises:

  • (ArgumentError)


169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/ruote/sequel/storage.rb', line 169

def delete(doc)

  raise ArgumentError.new('no _rev for doc') unless doc['_rev']

  cache_clear(doc)
    # usually not necessary, adding it not to forget it later on

  count = @sequel[@table].where(
    :typ => doc['type'], :ide => doc['_id'], :rev => doc['_rev'].to_i
  ).delete

  return (get(doc['type'], doc['_id']) || true) if count < 1
    # failure

  nil
    # success
end

#get(type, key) ⇒ Object



164
165
166
167
# File 'lib/ruote/sequel/storage.rb', line 164

def get(type, key)

  cache_get(type, key) || do_get(type, key)
end

#get_many(type, key = nil, opts = {}) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/ruote/sequel/storage.rb', line 187

def get_many(type, key=nil, opts={})

  cached = cache_get_many(type, key, opts)
  return cached if cached

  ds = @sequel[@table].where(:typ => type)

  keys = key ? Array(key) : nil
  ds = ds.filter(:wfid => keys) if keys && keys.first.is_a?(String)

  return ds.count if opts[:count]

  ds = ds.order(
    opts[:descending] ? :ide.desc : :ide.asc, :rev.desc
  ).limit(
    opts[:limit], opts[:skip] || opts[:offset]
  )

  docs = select_last_revs(ds)
  docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) }

  if keys && keys.first.is_a?(Regexp)
    docs.select { |doc| keys.find { |key| key.match(doc['_id']) } }
  else
    docs
  end

  # (pass on the dataset.filter(:wfid => /regexp/) for now
  # since we have potentially multiple keys)
end

#ids(type) ⇒ Object

Returns all the ids of the documents of a given type.



220
221
222
223
# File 'lib/ruote/sequel/storage.rb', line 220

def ids(type)

  @sequel[@table].where(:typ => type).collect { |d| d[:ide] }.uniq.sort
end

#purge!Object

Nukes all the documents in this storage.



227
228
229
230
# File 'lib/ruote/sequel/storage.rb', line 227

def purge!

  @sequel[@table].delete
end

#purge_type!(type) ⇒ Object

Nukes a db type and reputs it (losing all the documents that were in it).



257
258
259
260
# File 'lib/ruote/sequel/storage.rb', line 257

def purge_type!(type)

  @sequel[@table].where(:typ => type).delete
end

#put(doc, opts = {}) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/ruote/sequel/storage.rb', line 131

def put(doc, opts={})

  cache_clear(doc)

  if doc['_rev']

    d = get(doc['type'], doc['_id'])

    return true unless d
    return d if d['_rev'] != doc['_rev']
      # failures
  end

  nrev = doc['_rev'].to_i + 1

  begin

    do_insert(doc, nrev, opts[:update_rev])

  rescue ::Sequel::DatabaseError => de

    return (get(doc['type'], doc['_id']) || true)
      # failure
  end

  @sequel[@table].where(
    :typ => doc['type'], :ide => doc['_id']
  ).filter { rev < nrev }.delete

  nil
    # success
end

#put_msg(action, options) ⇒ Object



98
99
100
101
102
103
104
105
# File 'lib/ruote/sequel/storage.rb', line 98

def put_msg(action, options)

  # put_msg is a unique action, no need for all the complexity of put

  do_insert(prepare_msg_doc(action, options), 1)

  nil
end

#put_schedule(flavour, owner_fei, s, msg) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/ruote/sequel/storage.rb', line 118

def put_schedule(flavour, owner_fei, s, msg)

  # put_schedule is a unique action, no need for all the complexity of put

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  do_insert(doc, 1)

  doc['_id']
end

#query_workitems(criteria) ⇒ Object



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/ruote/sequel/storage.rb', line 311

def query_workitems(criteria)

  ds = @sequel[@table].where(:typ => 'workitems')

  count = criteria.delete('count')

  limit = criteria.delete('limit')
  offset = criteria.delete('offset') || criteria.delete('skip')

  wfid =
    criteria.delete('wfid')
  pname =
    criteria.delete('participant_name') || criteria.delete('participant')

  ds = ds.filter(:ide.like("%!#{wfid}")) if wfid
  ds = ds.filter(:participant_name => pname) if pname

  criteria.collect do |k, v|
    ds = ds.filter(:doc.like("%\"#{k}\":#{Rufus::Json.encode(v)}%"))
  end

  return ds.count if count

  ds = ds.order(:ide.asc, :rev.desc).limit(limit, offset)

  select_last_revs(ds).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
end

#reserve(doc) ⇒ Object

Used to reserve ‘msgs’ and ‘schedules’. Simply deletes the document, return true if the delete was successful (ie if the reservation is valid).



111
112
113
114
115
116
# File 'lib/ruote/sequel/storage.rb', line 111

def reserve(doc)

  @sequel[@table].where(
    :typ => doc['type'], :ide => doc['_id'], :rev => 1
  ).delete > 0
end

#shutdownObject

Calls #disconnect on the db. According to Sequel’s doc, it closes all the idle connections in the pool (not the active ones).



235
236
237
238
# File 'lib/ruote/sequel/storage.rb', line 235

def shutdown

  @sequel.disconnect
end