Class: Ruote::Redis::Storage
- Inherits:
-
Object
- Object
- Ruote::Redis::Storage
- Includes:
- StorageBase
- Defined in:
- lib/ruote/redis/storage.rb
Overview
A Redis storage for ruote.
The constructor accepts two arguments, the first one is a Redis instance ( see github.com/ezmobius/redis-rb ), the second one is the classic ruote engine options( see ruote.rubyforge.org/configuration.html#engine )
require 'redis' # gem install redis
require 'ruote' # gem install ruote
require 'ruote-redis' # gem install ruote-redis
engine = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::Redis::RedisStorage.new('db'=> 14, 'thread_safe' => true)))
em-redis
Not tried, but I guess, that substituting an instance of em-redis for the redis instance passed to the constructor might work. github.com/madsimian/em-redis
If you try and it works, feedback is welcome groups.google.com/group/openwferu-users
‘pop_count’ option
By default, when the worker queries this storage for msgs to process, the storage will try to pop 28 msgs. This number can be changed thanks to the ‘pop_count’ option, like in:
engine = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::Redis::RedisStorage.new(
'db'=> 14, 'thread_safe' => true, 'pop_count' => 56)))
Don’t put too high a number, it increases the chance of msgs getting lost in case of the worker going down.
(if there is a need to avoid such a scenario in the future, Redis’ rpoplpush might come in handy).
Direct Known Subclasses
Constant Summary collapse
- REDIS_OPTIONS =
Listing the redis options to differentiate them from ruote storage options.
%w[ host port path db thread_safe logger ]
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Class Method Summary collapse
-
.keys_to_a(pattern) ⇒ Object
Returns an array of the (String) keys that match the given pattern.
Instance Method Summary collapse
-
#add_type(type) ⇒ Object
Mainly used by ruote’s test/unit/ut_17_storage.rb.
-
#close ⇒ Object
Shuts this worker down.
- #delete(doc) ⇒ Object
- #delete_schedule(schedule_id) ⇒ Object
- #get(type, key) ⇒ Object
- #get_many(type, key = nil, opts = {}) ⇒ Object
-
#get_msgs ⇒ Object
Note: the worker argument is not used in this storage implementation.
- #ids(type) ⇒ Object
-
#initialize(redis, options = {}) ⇒ Storage
constructor
A Redis storage for ruote.
- #purge! ⇒ Object
-
#purge_type!(type) ⇒ Object
Nukes a db type and reputs it(losing all the documents that were in it).
- #put(doc, opts = {}) ⇒ Object
- #put_msg(action, options) ⇒ Object
- #put_schedule(flavour, owner_fei, s, msg) ⇒ Object
-
#reconnect ⇒ Object
Simply calls @redis.reconnect.
-
#reserve(doc) ⇒ Object
Returns true if the doc is successfully deleted.
-
#shutdown ⇒ Object
Shuts this worker down.
Constructor Details
#initialize(redis, options = {}) ⇒ Storage
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/ruote/redis/storage.rb', line 109 def initialize(redis, ={}) if == {} && redis.is_a?(Hash) , = redis.partition { |k, v| REDIS_OPTIONS.include?(k.to_s) } = Hash[.collect { |k, v| [ k.to_sym, v ] }] = Hash[] redis = ::Redis.new() end @redis = redis @options = @pop_count = @options['pop_count'] || 28 # Returns an array of the (String) keys that match the given pattern. # # Returns an empty array if anything goes wrong. # def @redis.keys_to_a(pattern) if (a = (keys(pattern) rescue nil)).is_a?(Array) a else [] end end replace_engine_configuration() end |
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
83 84 85 |
# File 'lib/ruote/redis/storage.rb', line 83 def redis @redis end |
Class Method Details
.keys_to_a(pattern) ⇒ Object
Returns an array of the (String) keys that match the given pattern.
Returns an empty array if anything goes wrong.
132 133 134 135 136 137 138 139 |
# File 'lib/ruote/redis/storage.rb', line 132 def @redis.keys_to_a(pattern) if (a = (keys(pattern) rescue nil)).is_a?(Array) a else [] end end |
Instance Method Details
#add_type(type) ⇒ Object
Mainly used by ruote’s test/unit/ut_17_storage.rb
361 362 363 364 |
# File 'lib/ruote/redis/storage.rb', line 361 def add_type(type) # nothing to be done end |
#close ⇒ Object
Shuts this worker down.
(This close / shutdown dichotomy has to be resolved at some point…)
345 346 347 348 |
# File 'lib/ruote/redis/storage.rb', line 345 def close @redis.quit end |
#delete(doc) ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/ruote/redis/storage.rb', line 238 def delete(doc) rev = doc['_rev'] raise ArgumentError.new("can't delete doc without _rev") unless rev key = key_for(doc) lock(key) do current_doc = do_get(key) if current_doc.nil? # # document is [already] gone, delete fails (return true) # true elsif current_doc['_rev'] != rev # # version in storage doesn't match version to delete # (return version in storage) # current_doc else # # delete is successful (return nil) # @redis.del(key) nil end end end |
#delete_schedule(schedule_id) ⇒ Object
188 189 190 191 192 193 |
# File 'lib/ruote/redis/storage.rb', line 188 def delete_schedule(schedule_id) return unless schedule_id @redis.del(key_for('schedules', schedule_id)) end |
#get(type, key) ⇒ Object
233 234 235 236 |
# File 'lib/ruote/redis/storage.rb', line 233 def get(type, key) do_get(key_for(type, key)) end |
#get_many(type, key = nil, opts = {}) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/ruote/redis/storage.rb', line 274 def get_many(type, key=nil, opts={}) keys = key ? Array(key) : nil #ids = if type == 'msgs' || type == 'schedules' # @redis.keys_to_a("#{type}/*") ids = if keys == nil @redis.keys_to_a("#{type}/*") elsif keys.first.is_a?(String) keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}#{type == 'schedules' ? '-*' : ''}") }.flatten else #if keys.first.is_a?(Regexp) @redis.keys_to_a("#{type}/*").select { |i| i = i[type.length + 1..-1] # removing "^type/" keys.find { |k| k.match(i) } } end ids = ids.reject { |i| i.match(LOCK_KEY) } ids = ids.sort ids = ids.reverse if opts[:descending] skip = opts[:skip] || 0 limit = opts[:limit] || ids.length ids = ids[skip, limit] docs = ids.length > 0 && @redis.mget(*ids) docs = docs.is_a?(Array) ? docs : [] docs = docs.each_with_object({}) do |doc, h| next unless doc doc = Rufus::Json.decode(doc) h[doc['_id']] = doc end return docs.size if opts[:count] docs = docs.values.sort_by { |d| d['_id'] } opts[:descending] ? docs.reverse : docs end |
#get_msgs ⇒ Object
Note: the worker argument is not used in this storage implementation.
165 166 167 168 169 170 171 172 |
# File 'lib/ruote/redis/storage.rb', line 165 def get_msgs @redis.pipelined { @pop_count.times { @redis.rpop('msgs') } }.compact.collect { |d| from_json(d) } end |
#ids(type) ⇒ Object
326 327 328 329 330 331 332 333 |
# File 'lib/ruote/redis/storage.rb', line 326 def ids(type) @redis.keys_to_a("#{type}/*").reject { |i| i.match(LOCK_KEY) }.collect { |i| i.split('/').last }.sort end |
#purge! ⇒ Object
335 336 337 338 339 |
# File 'lib/ruote/redis/storage.rb', line 335 def purge! 2.times { @redis.flushdb rescue nil } # 2 times to work around Redis::ProtocolError '3' end |
#purge_type!(type) ⇒ Object
Nukes a db type and reputs it(losing all the documents that were in it).
368 369 370 371 |
# File 'lib/ruote/redis/storage.rb', line 368 def purge_type!(type) @redis.keys_to_a("#{type}/*").each { |k| (@redis.del(k) rescue nil) } end |
#put(doc, opts = {}) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/ruote/redis/storage.rb', line 195 def put(doc, opts={}) key = key_for(doc) rev = doc['_rev'] lock(key) do current_doc = do_get(key) current_rev = current_doc ? current_doc['_rev'] : nil if current_rev && rev != current_rev # # version in storage is newer than version being put, # (eturn version in storage) # current_doc elsif rev && current_rev.nil? # # document deleted, put fails (return true) # true else # # put is successful (return nil) # doc = doc.send( opts[:update_rev] ? :merge! : :merge, { '_rev' => (rev.to_i + 1).to_s, 'put_at' => Ruote.now_to_utc_s }) @redis.set(key, Rufus::Json.encode(doc)) nil end end end |
#put_msg(action, options) ⇒ Object
153 154 155 156 157 158 159 160 161 |
# File 'lib/ruote/redis/storage.rb', line 153 def put_msg(action, ) doc = prepare_msg_doc(action, ) doc['put_at'] = Ruote.now_to_utc_s @redis.lpush('msgs', Rufus::Json.encode(doc)) nil end |
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/ruote/redis/storage.rb', line 174 def put_schedule(flavour, owner_fei, s, msg) doc = prepare_schedule_doc(flavour, owner_fei, s, msg) return nil unless doc doc['_rev'] = '0' doc['put_at'] = Ruote.now_to_utc_s @redis.set(key_for(doc), Rufus::Json.encode(doc)) doc['_id'] end |
#reconnect ⇒ Object
Simply calls @redis.reconnect
375 376 377 378 |
# File 'lib/ruote/redis/storage.rb', line 375 def reconnect @redis.reconnect end |
#reserve(doc) ⇒ Object
Returns true if the doc is successfully deleted.
146 147 148 149 150 151 |
# File 'lib/ruote/redis/storage.rb', line 146 def reserve(doc) return true if doc['type'] == 'msgs' (@redis.del(key_for(doc)) == 1) end |
#shutdown ⇒ Object
Shuts this worker down.
(This close / shutdown dichotomy has to be resolved at some point…)
354 355 356 357 |
# File 'lib/ruote/redis/storage.rb', line 354 def shutdown @redis.quit end |