Class: Collective::Redis::Storage

Inherits:
Object
  • Object
show all
Defined in:
lib/collective/redis/storage.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis = nil) ⇒ Storage

Returns a new instance of Storage.



9
10
11
# File 'lib/collective/redis/storage.rb', line 9

def initialize( redis = nil )
  self.redis = redis if redis
end

Instance Method Details

#del(key) ⇒ Object



28
29
30
# File 'lib/collective/redis/storage.rb', line 28

def del( key )
  redis.del( key )
end

#get(key) ⇒ Object



24
25
26
# File 'lib/collective/redis/storage.rb', line 24

def get( key )
  redis.get( key )
end

#map_del(map_name, key) ⇒ Object



115
116
117
# File 'lib/collective/redis/storage.rb', line 115

def map_del( map_name, key )
  redis.hdel( map_name, key )
end

#map_get(map_name, key) ⇒ Object



103
104
105
# File 'lib/collective/redis/storage.rb', line 103

def map_get( map_name, key )
  redis.hget( map_name, key )
end

#map_get_all_keys(map_name) ⇒ Object



107
108
109
# File 'lib/collective/redis/storage.rb', line 107

def map_get_all_keys( map_name )
  redis.hkeys( map_name )
end

#map_set(map_name, key, value) ⇒ Object

Maps



99
100
101
# File 'lib/collective/redis/storage.rb', line 99

def map_set( map_name, key, value )
  redis.hset( map_name, key, value )
end

#map_size(map_name) ⇒ Object



111
112
113
# File 'lib/collective/redis/storage.rb', line 111

def map_size( map_name )
  redis.hlen( map_name )
end

#put(key, value) ⇒ Object

Simple values



20
21
22
# File 'lib/collective/redis/storage.rb', line 20

def put( key, value )
  redis.set( key, value )
end

#queue_add(queue_name, item, score = Time.now.to_i) ⇒ Object

Priority Queue



56
57
58
59
60
61
62
63
# File 'lib/collective/redis/storage.rb', line 56

def queue_add( queue_name, item, score = Time.now.to_i )
  score = score.to_f
  begin
    redis.zadd( queue_name, score, item )
  rescue Exception => x
    raise x, "Failed zadd( #{queue_name.inspect}, #{score.inspect}, #{item.inspect} ) because of an error: #{x.message}", x.backtrace
  end
end

#queue_del(queue_name) ⇒ Object



93
94
95
# File 'lib/collective/redis/storage.rb', line 93

def queue_del( queue_name )
  redis.del( queue_name )
end

#queue_pop(queue_name, max_score = Time.now.to_i) ⇒ Object

pop the lowest item from the queue IFF it scores <= max_score



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/collective/redis/storage.rb', line 66

def queue_pop( queue_name, max_score = Time.now.to_i )
  # Option 1: zrange, check score, accept or discard
  # Option 2: zrangebyscore with limit, then zremrangebyrank

  redis.watch( queue_name )
  it = redis.zrangebyscore( queue_name, 0, max_score, limit: [0,1] ).first
  if it then
    ok = redis.multi { |r| r.zremrangebyrank( queue_name, 0, 0 ) }
    it = nil if ! ok
  else
    redis.unwatch
  end
  it
end

#queue_pop_sync(queue_name, max_score = Time.now.to_i, options = {}) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/collective/redis/storage.rb', line 81

def queue_pop_sync( queue_name, max_score = Time.now.to_i, options = {} )
  timeout  = options[:timeout] || 1
  deadline = Time.now.to_f + timeout

  loop do
    result = queue_pop( queue_name, max_score )
    return result if result

    raise Timeout::Error if Time.now.to_f > deadline
  end
end

#reconnect_after_forkObject



13
14
15
# File 'lib/collective/redis/storage.rb', line 13

def reconnect_after_fork
  redis.client.disconnect
end

#redisObject



138
139
140
# File 'lib/collective/redis/storage.rb', line 138

def redis
  @redis ||= ::Redis.connect( url: "redis://127.0.0.1:6379/1" )
end

#redis=(redis_or_options) ⇒ Object

Parameters:

  • redis_client

    can only be set once

Raises:



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/collective/redis/storage.rb', line 124

def redis=( redis_or_options )
  raise Collective::ConfigurationError if @redis

  case redis_or_options
  when Hash
    options   = redis_or_options.dup
    namespace = options.delete(:namespace)
    @redis    = Redis.connect(options)
    @redis    = Redis::Namespace.new( namespace, redis: @redis ) if namespace
  else
    @redis    = redis_or_options
  end
end

#set_add(set_name, value) ⇒ Object

Sets



34
35
36
# File 'lib/collective/redis/storage.rb', line 34

def set_add( set_name, value )
  redis.sadd( set_name, value )
end

#set_get_all(set_name) ⇒ Object



46
47
48
# File 'lib/collective/redis/storage.rb', line 46

def set_get_all( set_name )
  redis.smembers( set_name )
end

#set_member?(set_name, value) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/collective/redis/storage.rb', line 50

def set_member?( set_name, value )
  redis.sismember( set_name, value )
end

#set_remove(set_name, value) ⇒ Object



42
43
44
# File 'lib/collective/redis/storage.rb', line 42

def set_remove( set_name, value )
  redis.srem( set_name, value )
end

#set_size(set_name) ⇒ Object



38
39
40
# File 'lib/collective/redis/storage.rb', line 38

def set_size( set_name )
  redis.scard( set_name )
end