Class: Ruote::Redis::RedisStorage
- Inherits:
-
Object
- Object
- Ruote::Redis::RedisStorage
- Includes:
- StorageBase
- Defined in:
- lib/ruote/redis/storage.rb
Overview
A Redis storage for ruote.
The constructor accepts two arguments, the first one is a Redis instance ( see github.com/ezmobius/redis-rb ), the second one is the classic ruote engine options ( see ruote.rubyforge.org/configuration.html#engine )
require 'redis' # gem install redis
require 'ruote' # gem install ruote
require 'ruote-redis' # gem install ruote-redis
engine = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::Redis::RedisStorage.new(
::Redis.new(:db => 14, :thread_safe => true), {})))
em-redis
Not tried, but I guess, that substituting an instance of em-redis for the redis instance passed to the constructor might work. github.com/madsimian/em-redis
If you try and it works, feedback is welcome groups.google.com/group/openwferu-users
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_type(type) ⇒ Object
Mainly used by ruote’s test/unit/ut_17_storage.rb.
- #delete(doc) ⇒ Object
- #delete_schedule(schedule_id) ⇒ Object
- #get(type, key) ⇒ Object
- #get_many(type, key = nil, opts = {}) ⇒ Object
- #ids(type) ⇒ Object
-
#initialize(redis, options = {}) ⇒ RedisStorage
constructor
A new instance of RedisStorage.
- #purge! ⇒ Object
-
#purge_type!(type) ⇒ Object
Nukes a db type and reputs it (losing all the documents that were in it).
- #put(doc, opts = {}) ⇒ Object
- #put_msg(action, options) ⇒ Object
- #put_schedule(flavour, owner_fei, s, msg) ⇒ Object
- #reserve(doc) ⇒ Object
-
#shutdown ⇒ Object
def dump (type) @dbs.dump end.
Constructor Details
#initialize(redis, options = {}) ⇒ RedisStorage
Returns a new instance of RedisStorage.
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/ruote/redis/storage.rb', line 69 def initialize (redis, ={}) @redis = redis @options = def @redis.keys_to_a (opt) r = keys(opt) r.is_a?(Array) ? r : r.split(' ') end put_configuration end |
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
67 68 69 |
# File 'lib/ruote/redis/storage.rb', line 67 def redis @redis end |
Class Method Details
.keys_to_a(opt) ⇒ Object
74 75 76 77 |
# File 'lib/ruote/redis/storage.rb', line 74 def @redis.keys_to_a (opt) r = keys(opt) r.is_a?(Array) ? r : r.split(' ') end |
Instance Method Details
#add_type(type) ⇒ Object
Mainly used by ruote’s test/unit/ut_17_storage.rb
225 226 |
# File 'lib/ruote/redis/storage.rb', line 225 def add_type (type) end |
#delete(doc) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/ruote/redis/storage.rb', line 144 def delete (doc) raise ArgumentError.new( "can't delete doc without _rev") unless doc['_rev'] r = put(doc, :delete => true) return r if r != nil @redis.keys_to_a("#{key_for(doc)}*").sort.each { |k| Thread.pass # lingering a bit... @redis.del(k) } # deleting the key_rev last and making 1 'keys' call preliminarily nil end |
#delete_schedule(schedule_id) ⇒ Object
106 107 108 109 |
# File 'lib/ruote/redis/storage.rb', line 106 def delete_schedule (schedule_id) @redis.del(key_for('schedules', schedule_id)) end |
#get(type, key) ⇒ Object
139 140 141 142 |
# File 'lib/ruote/redis/storage.rb', line 139 def get (type, key) do_get(type, key, @redis.get(key_for(type, key))) end |
#get_many(type, key = nil, opts = {}) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/ruote/redis/storage.rb', line 162 def get_many (type, key=nil, opts={}) keys = "#{type}/*" ids = if type == 'msgs' || type == 'schedules' @redis.keys_to_a(keys) else @redis.keys_to_a(keys).inject({}) { |h, k| if m = k.match(/^[^\/]+\/([^\/]+)\/(\d+)$/) if ( ! key) || m[1].match(key) o = h[m[1]] n = [ m[2].to_i, k ] h[m[1]] = [ m[2].to_i, k ] if ( ! o) || o.first < n.first end end h }.values.collect { |i| i[1] } end if l = opts[:limit] ids = ids[0, l] end ids.inject([]) do |a, i| v = @redis.get(i) a << Rufus::Json.decode(v) if v a end end |
#ids(type) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/ruote/redis/storage.rb', line 199 def ids (type) @redis.keys_to_a("#{type}/*").inject([]) { |a, k| if m = k.match(/^[^\/]+\/([^\/]+)$/) a << m[1] end a }.sort end |
#purge! ⇒ Object
211 212 213 214 |
# File 'lib/ruote/redis/storage.rb', line 211 def purge! @redis.keys_to_a('*').each { |k| @redis.del(k) } end |
#purge_type!(type) ⇒ Object
Nukes a db type and reputs it (losing all the documents that were in it).
230 231 232 233 |
# File 'lib/ruote/redis/storage.rb', line 230 def purge_type! (type) @redis.keys_to_a("#{type}/*").each { |k| @redis.del(k) } end |
#put(doc, opts = {}) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/ruote/redis/storage.rb', line 111 def put (doc, opts={}) rev = doc['_rev'].to_i key = key_for(doc) current_rev = @redis.get(key).to_i return true if current_rev == 0 && rev > 0 return do_get(doc, current_rev) if rev != current_rev nrev = rev + 1 # the setnx here is crucial in multiple workers env... r = @redis.setnx( key_rev_for(doc, nrev), to_json(doc.merge('_rev' => nrev), opts)) return get(doc['type'], doc['_id']) if r == false @redis.set(key, nrev) @redis.del(key_rev_for(doc, rev)) if rev > 0 doc['_rev'] = nrev if opts[:update_rev] nil end |
#put_msg(action, options) ⇒ Object
87 88 89 90 91 92 93 94 |
# File 'lib/ruote/redis/storage.rb', line 87 def put_msg (action, ) doc = prepare_msg_doc(action, ) @redis.set(key_for(doc), to_json(doc)) nil end |
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
96 97 98 99 100 101 102 103 104 |
# File 'lib/ruote/redis/storage.rb', line 96 def put_schedule (flavour, owner_fei, s, msg) if doc = prepare_schedule_doc(flavour, owner_fei, s, msg) @redis.set(key_for(doc), to_json(doc)) return doc['_id'] end nil end |
#reserve(doc) ⇒ Object
82 83 84 85 |
# File 'lib/ruote/redis/storage.rb', line 82 def reserve (doc) @redis.del(key_for(doc)) end |
#shutdown ⇒ Object
def dump (type)
@dbs[type].dump
end
220 221 |
# File 'lib/ruote/redis/storage.rb', line 220 def shutdown end |