Class: Faye::Engine::Redis
- Inherits:
-
Base
- Object
- Base
- Faye::Engine::Redis
show all
- Defined in:
- lib/faye/engines/redis.rb
Constant Summary
collapse
- DEFAULT_HOST =
'localhost'
- DEFAULT_PORT =
6379
Constants included
from Logging
Logging::DEFAULT_LOG_LEVEL, Logging::LOG_LEVELS
Instance Attribute Summary
Attributes inherited from Base
#interval, #timeout
Attributes included from Logging
#log_level
Instance Method Summary
collapse
Methods inherited from Base
#close_connection, #connect, #connection, #flush, #initialize
Methods included from Timeouts
#add_timeout, #remove_timeout
Methods included from Logging
#log
Instance Method Details
#client_exists(client_id, &callback) ⇒ Object
79
80
81
82
83
84
|
# File 'lib/faye/engines/redis.rb', line 79
def client_exists(client_id, &callback)
init
@redis.sismember(@ns + '/clients', client_id) do |exists|
callback.call(exists != 0)
end
end
|
#create_client(&callback) ⇒ Object
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/faye/engines/redis.rb', line 38
def create_client(&callback)
init
client_id = Faye.random
@redis.sadd(@ns + '/clients', client_id) do |added|
if added == 0
create_client(&callback)
else
debug 'Created new client ?', client_id
ping(client_id)
callback.call(client_id)
end
end
end
|
#destroy_client(client_id, &callback) ⇒ Object
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/faye/engines/redis.rb', line 52
def destroy_client(client_id, &callback)
init
@redis.srem(@ns + '/clients', client_id)
@redis.del(@ns + "/clients/#{client_id}/messages")
remove_timeout(client_id)
@redis.del(@ns + "/clients/#{client_id}/ping")
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
n, i = channels.size, 0
if n == 0
debug 'Destroyed client ?', client_id
callback.call if callback
else
channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
if i == n
debug 'Destroyed client ?', client_id
callback.call if callback
end
end
end
end
end
end
|
#disconnect ⇒ Object
34
35
36
|
# File 'lib/faye/engines/redis.rb', line 34
def disconnect
@subscriber.unsubscribe(@ns + '/notifications')
end
|
#init ⇒ Object
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/faye/engines/redis.rb', line 8
def init
return if @redis
require 'em-hiredis'
host = @options[:host] || DEFAULT_HOST
port = @options[:port] || DEFAULT_PORT
db = @options[:database] || 0
auth = @options[:password]
@ns = @options[:namespace] || ''
@redis = EventMachine::Hiredis::Client.connect(host, port)
@subscriber = EventMachine::Hiredis::Client.connect(host, port)
if auth
@redis.auth(auth)
@subscriber.auth(auth)
end
@redis.select(db)
@subscriber.select(db)
@subscriber.subscribe(@ns + '/notifications')
@subscriber.on(:message) do |topic, message|
empty_queue(message) if topic == @ns + '/notifications'
end
end
|
#ping(client_id) ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/faye/engines/redis.rb', line 86
def ping(client_id)
timeout = @options[:timeout]
time = Time.now.to_i.to_s
return unless Numeric === timeout
debug 'Ping ?, ?', client_id, timeout
remove_timeout(client_id)
@redis.set(@ns + "/clients/#{client_id}/ping", time)
add_timeout(client_id, 2 * timeout) do
@redis.get(@ns + "/clients/#{client_id}/ping") do |ping|
destroy_client(client_id) if ping == time
end
end
end
|
#publish(message) ⇒ Object
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
# File 'lib/faye/engines/redis.rb', line 120
def publish(message)
init
debug 'Publishing message ?', message
json_message = JSON.dump(message)
channels = Channel.expand(message['channel'])
channels.each do |channel|
@redis.smembers(@ns + "/channels#{channel}") do |clients|
clients.each do |client_id|
debug 'Queueing for client ?: ?', client_id, message
@redis.sadd(@ns + "/clients/#{client_id}/messages", json_message)
@redis.publish(@ns + '/notifications', client_id)
end
end
end
end
|
#subscribe(client_id, channel, &callback) ⇒ Object
102
103
104
105
106
107
108
109
|
# File 'lib/faye/engines/redis.rb', line 102
def subscribe(client_id, channel, &callback)
init
@redis.sadd(@ns + "/clients/#{client_id}/channels", channel)
@redis.sadd(@ns + "/channels#{channel}", client_id) do
debug 'Subscribed client ? to channel ?', client_id, channel
callback.call if callback
end
end
|
#unsubscribe(client_id, channel, &callback) ⇒ Object
111
112
113
114
115
116
117
118
|
# File 'lib/faye/engines/redis.rb', line 111
def unsubscribe(client_id, channel, &callback)
init
@redis.srem(@ns + "/clients/#{client_id}/channels", channel)
@redis.srem(@ns + "/channels#{channel}", client_id) do
debug 'Unsubscribed client ? from channel ?', client_id, channel
callback.call if callback
end
end
|