Class: Resque::RedisComposite

Inherits:
Object
  • Object
show all
Defined in:
lib/resque/redis_composite.rb

Defined Under Namespace

Classes: NoDefaultRedisServerError

Constant Summary collapse

DEFAULT_SERVER_NAME =
'default'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ RedisComposite

Returns a new instance of RedisComposite.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/resque/redis_composite.rb', line 22

def initialize(config)
  config = { DEFAULT_SERVER_NAME => config } unless config.kind_of?(Hash)

  config = HashWithIndifferentAccess.new(config)
  raise NoDefaultRedisServerError, "No default server defined in configuration : #{config.inspect}" unless config.key?(DEFAULT_SERVER_NAME)

  # quick backup for what's in Resque
  original_resque_server = Resque.redis

  @mapping = config.inject(HashWithIndifferentAccess.new) do |hash, (queue_name, server)|
    # leaving Resque create Redis::Namespace instances
    Resque.redis = server

    hash[queue_name] = Resque.redis
    hash
  end

  Resque.redis = original_resque_server
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_name, *args, &block) ⇒ Object



42
43
44
45
# File 'lib/resque/redis_composite.rb', line 42

def method_missing(method_name, *args, &block)
  # most redis command's first parameter is the key, so it should work most of the time
  server_for(args.first).send(method_name, *args, &block)
end

Instance Attribute Details

#mappingObject (readonly)

Returns the value of attribute mapping.



7
8
9
# File 'lib/resque/redis_composite.rb', line 7

def mapping
  @mapping
end

Class Method Details

.create(config) ⇒ Object



11
12
13
# File 'lib/resque/redis_composite.rb', line 11

def create(config)
  Redis::Namespace.new(nil, :redis => RedisComposite.new(config))
end

.reconnect_all(job = nil) ⇒ Object



15
16
17
18
# File 'lib/resque/redis_composite.rb', line 15

def reconnect_all(job = nil)
  return unless Resque.redis.redis.kind_of?(Resque::RedisComposite)
  Resque.redis.mapping.each { |_, server| server.redis.client.reconnect }
end

Instance Method Details

#client(queue = DEFAULT_SERVER_NAME) ⇒ Object



47
48
49
50
# File 'lib/resque/redis_composite.rb', line 47

def client(queue = DEFAULT_SERVER_NAME)
  # TODO properly handle Resque.redis_id, probably need to return all our client ids
  server_for(queue).client
end

#rpush(key, value) ⇒ Object

Sometimes we’re pushing onto the ‘failed’ queue, and we want to make sure the failures are pushed into the same Redis server as the queue is hosted on.



81
82
83
84
85
86
87
88
# File 'lib/resque/redis_composite.rb', line 81

def rpush(key, value)
  if failed?(key)
    queue_with_failure = Resque.decode(value)["queue"]
    server_for(queue_with_failure).rpush(key, value)
  else
    server_for(key).rpush(key, value)
  end
end

#sadd(key, value) ⇒ Object

This is used to create a set of queue names, so needs some special treatment



53
54
55
56
57
58
59
# File 'lib/resque/redis_composite.rb', line 53

def sadd(key, value)
  if queues?(key)
    server_for(value).sadd(key, value)
  else
    server_for(key).sadd(key, value)
  end
end

#smembers(key) ⇒ Object

If we’re using smembers to get queue names, we aggregate across all servers



62
63
64
65
66
67
68
# File 'lib/resque/redis_composite.rb', line 62

def smembers(key)
  if queues?(key)
    servers.inject([]) { |a, s| a + s.smembers(key) }.uniq
  else
    server_for(key).smembers(key)
  end
end

#srem(key, value) ⇒ Object

This is used to completely delete a queue, so needs some special treatment



71
72
73
74
75
76
77
# File 'lib/resque/redis_composite.rb', line 71

def srem(key, value)
  if queues?(key)
    server_for(value).srem(key, value)
  else
    server_for(key).srem(key, value)
  end
end