Class: Ruote::Sequel::Storage

Inherits:
Object
  • Object
show all
Includes:
Ruote::StorageBase
Defined in:
lib/ruote/sequel/storage.rb

Overview

A Sequel storage implementation for ruote >= 2.2.0.

require 'rubygems'
require 'json' # gem install json
require 'ruote'
require 'ruote-sequel' # gem install ruote-sequel

sequel = Sequel.connect('postgres://localhost/ruote_test')
#sequel = Sequel.connect('mysql://root:root@localhost/ruote_test')

opts = { 'remote_definition_allowed' => true }

engine = Ruote::Engine.new(
  Ruote::Worker.new(
    Ruote::Sequel::Storage.new(sequel, opts)))

# ...

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sequel, options = {}) ⇒ Storage

Returns a new instance of Storage.



80
81
82
83
84
85
86
# File 'lib/ruote/sequel/storage.rb', line 80

def initialize(sequel, options={})

  @sequel = sequel
  @options = options

  put_configuration
end

Instance Attribute Details

#sequelObject (readonly)

The underlying Sequel::Database instance



78
79
80
# File 'lib/ruote/sequel/storage.rb', line 78

def sequel
  @sequel
end

Instance Method Details

#add_type(type) ⇒ Object

Mainly used by ruote’s test/unit/ut_17_storage.rb



231
232
233
234
# File 'lib/ruote/sequel/storage.rb', line 231

def add_type(type)

  # does nothing, types are differentiated by the 'typ' column
end

#by_field(type, field, value = nil) ⇒ Object

Querying workitems by field (warning, goes deep into the JSON structure)

Raises:

  • (NotImplementedError)


258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/ruote/sequel/storage.rb', line 258

def by_field(type, field, value=nil)

  raise NotImplementedError if type != 'workitems'

  lk = [ '%"', field, '":' ]
  lk.push(Rufus::Json.encode(value)) if value
  lk.push('%')

  docs = @sequel[:documents].where(:typ => type).filter(:doc.like(lk.join))

  select_last_revs(docs).collect { |d| Rufus::Json.decode(d[:doc]) }
end

#by_participant(type, participant_name, opts) ⇒ Object

A provision made for workitems, allow to query them directly by participant name.

Raises:

  • (NotImplementedError)


246
247
248
249
250
251
252
253
254
# File 'lib/ruote/sequel/storage.rb', line 246

def by_participant(type, participant_name, opts)

  raise NotImplementedError if type != 'workitems'

  docs = @sequel[:documents].where(
    :typ => type, :participant_name => participant_name)

  select_last_revs(docs).collect { |d| Rufus::Json.decode(d[:doc]) }
end

#closeObject

Grrr… I should sort the mess between close and shutdown… Tests vs production :-(



224
225
226
227
# File 'lib/ruote/sequel/storage.rb', line 224

def close

  @sequel.disconnect
end

#delete(doc) ⇒ Object

Raises:

  • (ArgumentError)


150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/ruote/sequel/storage.rb', line 150

def delete(doc)

  raise ArgumentError.new('no _rev for doc') unless doc['_rev']

  count = do_delete(doc)

  return (get(doc['type'], doc['_id']) || true) if count != 1
    # failure

  nil
    # success
end

#dump(type) ⇒ Object

Returns a string representation the current content of the storage for a given type.



207
208
209
210
211
# File 'lib/ruote/sequel/storage.rb', line 207

def dump(type)

  "=== #{type} ===\n" +
  get_many(type).map { |h| "  #{h['_id']} => #{h.inspect}" }.join("\n")
end

#get(type, key) ⇒ Object



143
144
145
146
147
148
# File 'lib/ruote/sequel/storage.rb', line 143

def get(type, key)

  d = do_get(type, key)

  d ? Rufus::Json.decode(d[:doc]) : nil
end

#get_many(type, key = nil, opts = {}) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/ruote/sequel/storage.rb', line 163

def get_many(type, key=nil, opts={})

  ds = @sequel[:documents].where(:typ => type)

  keys = key ? Array(key) : nil
  ds = ds.filter(:wfid => keys) if keys && keys.first.is_a?(String)

  return ds.all.size if opts[:count]

  ds = ds.order(
    *(opts[:descending] ? [ :ide.desc, :rev.desc ] : [ :ide.asc, :rev.asc ])
  )

  ds = ds.limit(opts[:limit], opts[:skip])

  docs = ds.all
  docs = select_last_revs(docs, opts[:descending])
  docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) }

  keys && keys.first.is_a?(Regexp) ?
    docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } :
    docs

  # (pass on the dataset.filter(:wfid => /regexp/) for now
  # since we have potentially multiple keys)
end

#ids(type) ⇒ Object

Returns all the ids of the documents of a given type.



192
193
194
195
# File 'lib/ruote/sequel/storage.rb', line 192

def ids(type)

  @sequel[:documents].where(:typ => type).collect { |d| d[:ide] }.uniq.sort
end

#purge!Object

Nukes all the documents in this storage.



199
200
201
202
# File 'lib/ruote/sequel/storage.rb', line 199

def purge!

  @sequel[:documents].delete
end

#purge_type!(type) ⇒ Object

Nukes a db type and reputs it (losing all the documents that were in it).



238
239
240
241
# File 'lib/ruote/sequel/storage.rb', line 238

def purge_type!(type)

  @sequel[:documents].where(:typ => type).delete
end

#put(doc, opts = {}) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/ruote/sequel/storage.rb', line 110

def put(doc, opts={})

  if doc['_rev']

    d = get(doc['type'], doc['_id'])

    return true unless d
    return d if d['_rev'] != doc['_rev']
      # failures
  end

  nrev = doc['_rev'].to_i + 1

  begin

    do_insert(doc, nrev)

  rescue ::Sequel::DatabaseError => de

    return (get(doc['type'], doc['_id']) || true)
      # failure
  end

  @sequel[:documents].where(
    :typ => doc['type'], :ide => doc['_id']
  ).filter { rev < nrev }.delete

  doc['_rev'] = nrev if opts[:update_rev]

  nil
    # success
end

#put_msg(action, options) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/ruote/sequel/storage.rb', line 88

def put_msg(action, options)

  # put_msg is a unique action, no need for all the complexity of put

  do_insert(prepare_msg_doc(action, options), 1)

  nil
end

#put_schedule(flavour, owner_fei, s, msg) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/ruote/sequel/storage.rb', line 97

def put_schedule(flavour, owner_fei, s, msg)

  # put_schedule is a unique action, no need for all the complexity of put

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  do_insert(doc, 1)

  doc['_id']
end

#query_workitems(criteria) ⇒ Object



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/ruote/sequel/storage.rb', line 271

def query_workitems(criteria)

  ds = @sequel[:documents].where(:typ => 'workitems')

  return select_last_revs(ds.all).size if criteria['count']

  limit = criteria.delete('limit')
  offset = criteria.delete('offset')

  ds = ds.limit(limit, offset)

  wfid =
    criteria.delete('wfid')
  pname =
    criteria.delete('participant_name') || criteria.delete('participant')

  ds = ds.filter(:ide.like("%!#{wfid}")) if wfid
  ds = ds.filter(:participant_name => pname) if pname

  criteria.collect do |k, v|
    ds = ds.filter(:doc.like("%\"#{k}\":#{Rufus::Json.encode(v)}%"))
  end

  select_last_revs(ds.all).collect { |d|
    Ruote::Workitem.new(Rufus::Json.decode(d[:doc]))
  }
end

#shutdownObject

Calls #disconnect on the db. According to Sequel’s doc, it closes all the idle connections in the pool (not the active ones).



216
217
218
219
# File 'lib/ruote/sequel/storage.rb', line 216

def shutdown

  @sequel.disconnect
end