Class: Faye::Engine::Redis
- Inherits:
-
Base
- Object
- Base
- Faye::Engine::Redis
show all
- Defined in:
- lib/faye/engines/redis.rb
Constant Summary
collapse
- DEFAULT_HOST =
'localhost'
- DEFAULT_PORT =
6379
- DEFAULT_DATABASE =
0
- DEFAULT_GC =
60
- LOCK_TIMEOUT =
120
Constants included
from Logging
Logging::DEFAULT_LOG_LEVEL, Logging::LOG_LEVELS
Instance Attribute Summary
Attributes inherited from Base
#interval, #timeout
Attributes included from Logging
#log_level
Instance Method Summary
collapse
Methods inherited from Base
#close_connection, #connect, #connection, #flush, #initialize
Methods included from Logging
#log
Instance Method Details
#client_exists(client_id, &callback) ⇒ Object
83
84
85
86
87
88
|
# File 'lib/faye/engines/redis.rb', line 83
def client_exists(client_id, &callback)
init
@redis.zscore(@ns + '/clients', client_id) do |score|
callback.call(score != nil)
end
end
|
#create_client(&callback) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/faye/engines/redis.rb', line 45
def create_client(&callback)
init
client_id = Faye.random
@redis.zadd(@ns + '/clients', 0, client_id) do |added|
if added == 0
create_client(&callback)
else
debug 'Created new client ?', client_id
ping(client_id)
callback.call(client_id)
end
end
end
|
#destroy_client(client_id, &callback) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/faye/engines/redis.rb', line 59
def destroy_client(client_id, &callback)
init
@redis.zrem(@ns + '/clients', client_id)
@redis.del(@ns + "/clients/#{client_id}/messages")
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
n, i = channels.size, 0
if n == 0
debug 'Destroyed client ?', client_id
callback.call if callback
else
channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
if i == n
debug 'Destroyed client ?', client_id
callback.call if callback
end
end
end
end
end
end
|
#disconnect ⇒ Object
40
41
42
43
|
# File 'lib/faye/engines/redis.rb', line 40
def disconnect
@subscriber.unsubscribe(@ns + '/notifications')
EventMachine.cancel_timer(@gc)
end
|
#init ⇒ Object
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/faye/engines/redis.rb', line 11
def init
return if @redis
require 'em-hiredis'
host = @options[:host] || DEFAULT_HOST
port = @options[:port] || DEFAULT_PORT
db = @options[:database] || 0
auth = @options[:password]
gc = @options[:gc] || DEFAULT_GC
@ns = @options[:namespace] || ''
@redis = EventMachine::Hiredis::Client.connect(host, port)
@subscriber = EventMachine::Hiredis::Client.connect(host, port)
if auth
@redis.auth(auth)
@subscriber.auth(auth)
end
@redis.select(db)
@subscriber.select(db)
@subscriber.subscribe(@ns + '/notifications')
@subscriber.on(:message) do |topic, message|
empty_queue(message) if topic == @ns + '/notifications'
end
@gc = EventMachine.add_periodic_timer(gc, &method(:gc))
end
|
#ping(client_id) ⇒ Object
90
91
92
93
94
95
96
97
|
# File 'lib/faye/engines/redis.rb', line 90
def ping(client_id)
init
return unless Numeric === @timeout
time = Time.now.to_i
debug 'Ping ?, ?', client_id, time
@redis.zadd(@ns + '/clients', time, client_id)
end
|
#publish(message) ⇒ Object
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/faye/engines/redis.rb', line 117
def publish(message)
init
debug 'Publishing message ?', message
json_message = JSON.dump(message)
channels = Channel.expand(message['channel'])
keys = channels.map { |c| @ns + "/channels#{c}" }
@redis.sunion(*keys) do |clients|
clients.each do |client_id|
debug 'Queueing for client ?: ?', client_id, message
@redis.rpush(@ns + "/clients/#{client_id}/messages", json_message)
@redis.publish(@ns + '/notifications', client_id)
end
end
end
|
#subscribe(client_id, channel, &callback) ⇒ Object
99
100
101
102
103
104
105
106
|
# File 'lib/faye/engines/redis.rb', line 99
def subscribe(client_id, channel, &callback)
init
@redis.sadd(@ns + "/clients/#{client_id}/channels", channel)
@redis.sadd(@ns + "/channels#{channel}", client_id) do
debug 'Subscribed client ? to channel ?', client_id, channel
callback.call if callback
end
end
|
#unsubscribe(client_id, channel, &callback) ⇒ Object
108
109
110
111
112
113
114
115
|
# File 'lib/faye/engines/redis.rb', line 108
def unsubscribe(client_id, channel, &callback)
init
@redis.srem(@ns + "/clients/#{client_id}/channels", channel)
@redis.srem(@ns + "/channels#{channel}", client_id) do
debug 'Unsubscribed client ? from channel ?', client_id, channel
callback.call if callback
end
end
|