Class: Collective::Redis::Storage
- Inherits:
-
Object
- Object
- Collective::Redis::Storage
- Defined in:
- lib/collective/redis/storage.rb
Instance Method Summary collapse
- #del(key) ⇒ Object
- #get(key) ⇒ Object
-
#initialize(redis = nil) ⇒ Storage
constructor
A new instance of Storage.
- #map_del(map_name, key) ⇒ Object
- #map_get(map_name, key) ⇒ Object
- #map_get_all_keys(map_name) ⇒ Object
-
#map_set(map_name, key, value) ⇒ Object
Maps.
- #map_size(map_name) ⇒ Object
-
#put(key, value) ⇒ Object
Simple values.
-
#queue_add(queue_name, item, score = Time.now.to_i) ⇒ Object
Priority Queue.
- #queue_del(queue_name) ⇒ Object
-
#queue_pop(queue_name, max_score = Time.now.to_i) ⇒ Object
pop the lowest item from the queue IFF it scores <= max_score.
- #queue_pop_sync(queue_name, max_score = Time.now.to_i, options = {}) ⇒ Object
- #reconnect_after_fork ⇒ Object
- #redis ⇒ Object
- #redis=(redis_or_options) ⇒ Object
-
#set_add(set_name, value) ⇒ Object
Sets.
- #set_get_all(set_name) ⇒ Object
- #set_member?(set_name, value) ⇒ Boolean
- #set_remove(set_name, value) ⇒ Object
- #set_size(set_name) ⇒ Object
Constructor Details
#initialize(redis = nil) ⇒ Storage
Returns a new instance of Storage.
9 10 11 |
# File 'lib/collective/redis/storage.rb', line 9 def initialize( redis = nil ) self.redis = redis if redis end |
Instance Method Details
#del(key) ⇒ Object
28 29 30 |
# File 'lib/collective/redis/storage.rb', line 28 def del( key ) redis.del( key ) end |
#get(key) ⇒ Object
24 25 26 |
# File 'lib/collective/redis/storage.rb', line 24 def get( key ) redis.get( key ) end |
#map_del(map_name, key) ⇒ Object
115 116 117 |
# File 'lib/collective/redis/storage.rb', line 115 def map_del( map_name, key ) redis.hdel( map_name, key ) end |
#map_get(map_name, key) ⇒ Object
103 104 105 |
# File 'lib/collective/redis/storage.rb', line 103 def map_get( map_name, key ) redis.hget( map_name, key ) end |
#map_get_all_keys(map_name) ⇒ Object
107 108 109 |
# File 'lib/collective/redis/storage.rb', line 107 def map_get_all_keys( map_name ) redis.hkeys( map_name ) end |
#map_set(map_name, key, value) ⇒ Object
Maps
99 100 101 |
# File 'lib/collective/redis/storage.rb', line 99 def map_set( map_name, key, value ) redis.hset( map_name, key, value ) end |
#map_size(map_name) ⇒ Object
111 112 113 |
# File 'lib/collective/redis/storage.rb', line 111 def map_size( map_name ) redis.hlen( map_name ) end |
#put(key, value) ⇒ Object
Simple values
20 21 22 |
# File 'lib/collective/redis/storage.rb', line 20 def put( key, value ) redis.set( key, value ) end |
#queue_add(queue_name, item, score = Time.now.to_i) ⇒ Object
Priority Queue
56 57 58 59 60 61 62 63 |
# File 'lib/collective/redis/storage.rb', line 56 def queue_add( queue_name, item, score = Time.now.to_i ) score = score.to_f begin redis.zadd( queue_name, score, item ) rescue Exception => x raise x, "Failed zadd( #{queue_name.inspect}, #{score.inspect}, #{item.inspect} ) because of an error: #{x.}", x.backtrace end end |
#queue_del(queue_name) ⇒ Object
93 94 95 |
# File 'lib/collective/redis/storage.rb', line 93 def queue_del( queue_name ) redis.del( queue_name ) end |
#queue_pop(queue_name, max_score = Time.now.to_i) ⇒ Object
pop the lowest item from the queue IFF it scores <= max_score
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/collective/redis/storage.rb', line 66 def queue_pop( queue_name, max_score = Time.now.to_i ) # Option 1: zrange, check score, accept or discard # Option 2: zrangebyscore with limit, then zremrangebyrank redis.watch( queue_name ) it = redis.zrangebyscore( queue_name, 0, max_score, limit: [0,1] ).first if it then ok = redis.multi { |r| r.zremrangebyrank( queue_name, 0, 0 ) } it = nil if ! ok else redis.unwatch end it end |
#queue_pop_sync(queue_name, max_score = Time.now.to_i, options = {}) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/collective/redis/storage.rb', line 81 def queue_pop_sync( queue_name, max_score = Time.now.to_i, = {} ) timeout = [:timeout] || 1 deadline = Time.now.to_f + timeout loop do result = queue_pop( queue_name, max_score ) return result if result raise Timeout::Error if Time.now.to_f > deadline end end |
#reconnect_after_fork ⇒ Object
13 14 15 |
# File 'lib/collective/redis/storage.rb', line 13 def reconnect_after_fork redis.client.disconnect end |
#redis ⇒ Object
138 139 140 |
# File 'lib/collective/redis/storage.rb', line 138 def redis @redis ||= ::Redis.connect( url: "redis://127.0.0.1:6379/1" ) end |
#redis=(redis_or_options) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/collective/redis/storage.rb', line 124 def redis=( ) raise Collective::ConfigurationError if @redis case when Hash = .dup namespace = .delete(:namespace) @redis = Redis.connect() @redis = Redis::Namespace.new( namespace, redis: @redis ) if namespace else @redis = end end |
#set_add(set_name, value) ⇒ Object
Sets
34 35 36 |
# File 'lib/collective/redis/storage.rb', line 34 def set_add( set_name, value ) redis.sadd( set_name, value ) end |
#set_get_all(set_name) ⇒ Object
46 47 48 |
# File 'lib/collective/redis/storage.rb', line 46 def set_get_all( set_name ) redis.smembers( set_name ) end |
#set_member?(set_name, value) ⇒ Boolean
50 51 52 |
# File 'lib/collective/redis/storage.rb', line 50 def set_member?( set_name, value ) redis.sismember( set_name, value ) end |
#set_remove(set_name, value) ⇒ Object
42 43 44 |
# File 'lib/collective/redis/storage.rb', line 42 def set_remove( set_name, value ) redis.srem( set_name, value ) end |
#set_size(set_name) ⇒ Object
38 39 40 |
# File 'lib/collective/redis/storage.rb', line 38 def set_size( set_name ) redis.scard( set_name ) end |