Class: Resque::RedisComposite
- Inherits:
-
Object
- Object
- Resque::RedisComposite
- Defined in:
- lib/resque/redis_composite.rb
Defined Under Namespace
Classes: NoDefaultRedisServerError
Constant Summary collapse
- DEFAULT_SERVER_NAME =
'default'
Instance Attribute Summary collapse
-
#mapping ⇒ Object
readonly
Returns the value of attribute mapping.
Class Method Summary collapse
Instance Method Summary collapse
- #client(queue = DEFAULT_SERVER_NAME) ⇒ Object
-
#initialize(config) ⇒ RedisComposite
constructor
A new instance of RedisComposite.
- #method_missing(method_name, *args, &block) ⇒ Object
-
#rpush(key, value) ⇒ Object
Sometimes we’re pushing onto the ‘failed’ queue, and we want to make sure the failures are pushed into the same Redis server as the queue is hosted on.
-
#sadd(key, value) ⇒ Object
This is used to create a set of queue names, so needs some special treatment.
-
#smembers(key) ⇒ Object
If we’re using smembers to get queue names, we aggregate across all servers.
-
#srem(key, value) ⇒ Object
This is used to completely delete a queue, so needs some special treatment.
Constructor Details
#initialize(config) ⇒ RedisComposite
Returns a new instance of RedisComposite.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/resque/redis_composite.rb', line 22 def initialize(config) config = { DEFAULT_SERVER_NAME => config } unless config.kind_of?(Hash) config = HashWithIndifferentAccess.new(config) raise NoDefaultRedisServerError, "No default server defined in configuration : #{config.inspect}" unless config.key?(DEFAULT_SERVER_NAME) # quick backup for what's in Resque original_resque_server = Resque.redis @mapping = config.inject(HashWithIndifferentAccess.new) do |hash, (queue_name, server)| # leaving Resque create Redis::Namespace instances Resque.redis = server hash[queue_name] = Resque.redis hash end Resque.redis = original_resque_server end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_name, *args, &block) ⇒ Object
42 43 44 45 |
# File 'lib/resque/redis_composite.rb', line 42 def method_missing(method_name, *args, &block) # most redis command's first parameter is the key, so it should work most of the time server_for(args.first).send(method_name, *args, &block) end |
Instance Attribute Details
#mapping ⇒ Object (readonly)
Returns the value of attribute mapping.
7 8 9 |
# File 'lib/resque/redis_composite.rb', line 7 def mapping @mapping end |
Class Method Details
.create(config) ⇒ Object
11 12 13 |
# File 'lib/resque/redis_composite.rb', line 11 def create(config) Redis::Namespace.new(nil, :redis => RedisComposite.new(config)) end |
.reconnect_all(job = nil) ⇒ Object
15 16 17 18 |
# File 'lib/resque/redis_composite.rb', line 15 def reconnect_all(job = nil) return unless Resque.redis.redis.kind_of?(Resque::RedisComposite) Resque.redis.mapping.each { |_, server| server.redis.client.reconnect } end |
Instance Method Details
#client(queue = DEFAULT_SERVER_NAME) ⇒ Object
47 48 49 50 |
# File 'lib/resque/redis_composite.rb', line 47 def client(queue = DEFAULT_SERVER_NAME) # TODO properly handle Resque.redis_id, probably need to return all our client ids server_for(queue).client end |
#rpush(key, value) ⇒ Object
Sometimes we’re pushing onto the ‘failed’ queue, and we want to make sure the failures are pushed into the same Redis server as the queue is hosted on.
81 82 83 84 85 86 87 88 |
# File 'lib/resque/redis_composite.rb', line 81 def rpush(key, value) if failed?(key) queue_with_failure = Resque.decode(value)["queue"] server_for(queue_with_failure).rpush(key, value) else server_for(key).rpush(key, value) end end |
#sadd(key, value) ⇒ Object
This is used to create a set of queue names, so needs some special treatment
53 54 55 56 57 58 59 |
# File 'lib/resque/redis_composite.rb', line 53 def sadd(key, value) if queues?(key) server_for(value).sadd(key, value) else server_for(key).sadd(key, value) end end |
#smembers(key) ⇒ Object
If we’re using smembers to get queue names, we aggregate across all servers
62 63 64 65 66 67 68 |
# File 'lib/resque/redis_composite.rb', line 62 def smembers(key) if queues?(key) servers.inject([]) { |a, s| a + s.smembers(key) }.uniq else server_for(key).smembers(key) end end |
#srem(key, value) ⇒ Object
This is used to completely delete a queue, so needs some special treatment
71 72 73 74 75 76 77 |
# File 'lib/resque/redis_composite.rb', line 71 def srem(key, value) if queues?(key) server_for(value).srem(key, value) else server_for(key).srem(key, value) end end |