Class: Ruote::Redis::Storage

Inherits:
Object
  • Object
show all
Includes:
StorageBase
Defined in:
lib/ruote/redis/storage.rb

Overview

A Redis storage for ruote.

The constructor accepts two arguments, the first one is a Redis instance ( see github.com/ezmobius/redis-rb ), the second one is the classic ruote engine options( see ruote.rubyforge.org/configuration.html#engine )

require 'redis' # gem install redis
require 'ruote' # gem install ruote
require 'ruote-redis' # gem install ruote-redis

engine = Ruote::Engine.new(
  Ruote::Worker.new(
    Ruote::Redis::RedisStorage.new('db'=> 14, 'thread_safe' => true)))

em-redis

Not tried, but I guess, that substituting an instance of em-redis for the redis instance passed to the constructor might work. github.com/madsimian/em-redis

If you try and it works, feedback is welcome groups.google.com/group/openwferu-users

‘pop_count’ option

By default, when the worker queries this storage for msgs to process, the storage will try to pop 28 msgs. This number can be changed thanks to the ‘pop_count’ option, like in:

engine = Ruote::Engine.new(
  Ruote::Worker.new(
    Ruote::Redis::RedisStorage.new(
      'db'=> 14, 'thread_safe' => true, 'pop_count' => 56)))

Don’t put too high a number, it increases the chance of msgs getting lost in case of the worker going down.

(if there is a need to avoid such a scenario in the future, Redis’ rpoplpush might come in handy).

Direct Known Subclasses

RedisStorage

Constant Summary collapse

REDIS_OPTIONS =

Listing the redis options to differentiate them from ruote storage options.

%w[ host port path db thread_safe logger ]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis, options = {}) ⇒ Storage

A Redis storage for ruote.

Can be initialized in two ways

Ruote::Redis::Storage.new(
  ::Redis.new(
    :host => '127.0.0.1',
    :db => 13,
    :thread_safe => true))

or

Ruote::Redis::Storage.new(
  'host' => '127.0.0.1',
  'db' => 13,
  'thread_safe' => true)

The first style is probably better avoided.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/ruote/redis/storage.rb', line 109

def initialize(redis, options={})

  if options == {} && redis.is_a?(Hash)

    redis_options, options = redis.partition { |k, v|
      REDIS_OPTIONS.include?(k.to_s)
    }

    redis_options = Hash[redis_options.collect { |k, v| [ k.to_sym, v ] }]
    options = Hash[options]

    redis = ::Redis.new(redis_options)
  end

  @redis = redis
  @options = options

  @pop_count = @options['pop_count'] || 28

  # Returns an array of the (String) keys that match the given pattern.
  #
  # Returns an empty array if anything goes wrong.
  #
  def @redis.keys_to_a(pattern)

    if (a = (keys(pattern) rescue nil)).is_a?(Array)
      a
    else
      []
    end
  end

  replace_engine_configuration(options)
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



83
84
85
# File 'lib/ruote/redis/storage.rb', line 83

def redis
  @redis
end

Class Method Details

.keys_to_a(pattern) ⇒ Object

Returns an array of the (String) keys that match the given pattern.

Returns an empty array if anything goes wrong.



132
133
134
135
136
137
138
139
# File 'lib/ruote/redis/storage.rb', line 132

def @redis.keys_to_a(pattern)

  if (a = (keys(pattern) rescue nil)).is_a?(Array)
    a
  else
    []
  end
end

Instance Method Details

#add_type(type) ⇒ Object

Mainly used by ruote’s test/unit/ut_17_storage.rb



361
362
363
364
# File 'lib/ruote/redis/storage.rb', line 361

def add_type(type)

  # nothing to be done
end

#closeObject

Shuts this worker down.

(This close / shutdown dichotomy has to be resolved at some point…)



345
346
347
348
# File 'lib/ruote/redis/storage.rb', line 345

def close

  @redis.quit
end

#delete(doc) ⇒ Object

Raises:

  • (ArgumentError)


238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/ruote/redis/storage.rb', line 238

def delete(doc)

  rev = doc['_rev']

  raise ArgumentError.new("can't delete doc without _rev") unless rev

  key = key_for(doc)

  lock(key) do

    current_doc = do_get(key)

    if current_doc.nil?
      #
      # document is [already] gone, delete fails (return true)
      #
      true

    elsif current_doc['_rev'] != rev
      #
      # version in storage doesn't match version to delete
      # (return version in storage)
      #
      current_doc

    else
      #
      # delete is successful (return nil)
      #
      @redis.del(key)

      nil
    end
  end
end

#delete_schedule(schedule_id) ⇒ Object



188
189
190
191
192
193
# File 'lib/ruote/redis/storage.rb', line 188

def delete_schedule(schedule_id)

  return unless schedule_id

  @redis.del(key_for('schedules', schedule_id))
end

#get(type, key) ⇒ Object



233
234
235
236
# File 'lib/ruote/redis/storage.rb', line 233

def get(type, key)

  do_get(key_for(type, key))
end

#get_many(type, key = nil, opts = {}) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/ruote/redis/storage.rb', line 274

def get_many(type, key=nil, opts={})

  keys = key ? Array(key) : nil

  #ids = if type == 'msgs' || type == 'schedules'
  #  @redis.keys_to_a("#{type}/*")

  ids = if keys == nil

    @redis.keys_to_a("#{type}/*")

  elsif keys.first.is_a?(String)

    keys.collect { |k|
      @redis.keys_to_a("#{type}/*!#{k}#{type == 'schedules' ? '-*' : ''}")
    }.flatten

  else #if keys.first.is_a?(Regexp)

    @redis.keys_to_a("#{type}/*").select { |i|

      i = i[type.length + 1..-1]
        # removing "^type/"

      keys.find { |k| k.match(i) }
    }
  end

  ids = ids.reject { |i| i.match(LOCK_KEY) }
  ids = ids.sort
  ids = ids.reverse if opts[:descending]

  skip = opts[:skip] || 0
  limit = opts[:limit] || ids.length
  ids = ids[skip, limit]

  docs = ids.length > 0 && @redis.mget(*ids)
  docs = docs.is_a?(Array) ? docs : []

  docs = docs.each_with_object({}) do |doc, h|
    next unless doc
    doc = Rufus::Json.decode(doc)
    h[doc['_id']] = doc
  end

  return docs.size if opts[:count]

  docs = docs.values.sort_by { |d| d['_id'] }

  opts[:descending] ? docs.reverse : docs
end

#get_msgsObject

Note: the worker argument is not used in this storage implementation.



165
166
167
168
169
170
171
172
# File 'lib/ruote/redis/storage.rb', line 165

def get_msgs

  @redis.pipelined {
    @pop_count.times { @redis.rpop('msgs') }
  }.compact.collect { |d|
    from_json(d)
  }
end

#ids(type) ⇒ Object



326
327
328
329
330
331
332
333
# File 'lib/ruote/redis/storage.rb', line 326

def ids(type)

  @redis.keys_to_a("#{type}/*").reject { |i|
    i.match(LOCK_KEY)
  }.collect { |i|
    i.split('/').last
  }.sort
end

#purge!Object



335
336
337
338
339
# File 'lib/ruote/redis/storage.rb', line 335

def purge!

  2.times { @redis.flushdb rescue nil }
    # 2 times to work around Redis::ProtocolError '3'
end

#purge_type!(type) ⇒ Object

Nukes a db type and reputs it(losing all the documents that were in it).



368
369
370
371
# File 'lib/ruote/redis/storage.rb', line 368

def purge_type!(type)

  @redis.keys_to_a("#{type}/*").each { |k| (@redis.del(k) rescue nil) }
end

#put(doc, opts = {}) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/ruote/redis/storage.rb', line 195

def put(doc, opts={})

  key = key_for(doc)
  rev = doc['_rev']

  lock(key) do

    current_doc = do_get(key)
    current_rev = current_doc ? current_doc['_rev'] : nil

    if current_rev && rev != current_rev
      #
      # version in storage is newer than version being put,
      # (eturn version in storage)
      #
      current_doc

    elsif rev && current_rev.nil?
      #
      # document deleted, put fails (return true)
      #
      true

    else
      #
      # put is successful (return nil)
      #
      doc = doc.send(
        opts[:update_rev] ? :merge! : :merge,
        { '_rev' => (rev.to_i + 1).to_s, 'put_at' => Ruote.now_to_utc_s })

      @redis.set(key, Rufus::Json.encode(doc))

      nil
    end
  end
end

#put_msg(action, options) ⇒ Object



153
154
155
156
157
158
159
160
161
# File 'lib/ruote/redis/storage.rb', line 153

def put_msg(action, options)

  doc = prepare_msg_doc(action, options)
  doc['put_at'] = Ruote.now_to_utc_s

  @redis.lpush('msgs', Rufus::Json.encode(doc))

  nil
end

#put_schedule(flavour, owner_fei, s, msg) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/ruote/redis/storage.rb', line 174

def put_schedule(flavour, owner_fei, s, msg)

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  doc['_rev'] = '0'
  doc['put_at'] = Ruote.now_to_utc_s

  @redis.set(key_for(doc), Rufus::Json.encode(doc))

  doc['_id']
end

#reconnectObject

Simply calls @redis.reconnect



375
376
377
378
# File 'lib/ruote/redis/storage.rb', line 375

def reconnect

  @redis.reconnect
end

#reserve(doc) ⇒ Object

Returns true if the doc is successfully deleted.



146
147
148
149
150
151
# File 'lib/ruote/redis/storage.rb', line 146

def reserve(doc)

  return true if doc['type'] == 'msgs'

  (@redis.del(key_for(doc)) == 1)
end

#shutdownObject

Shuts this worker down.

(This close / shutdown dichotomy has to be resolved at some point…)



354
355
356
357
# File 'lib/ruote/redis/storage.rb', line 354

def shutdown

  @redis.quit
end