Class: Faye::Engine::Redis

Inherits:
Base
  • Object
show all
Defined in:
lib/faye/engines/redis.rb

Constant Summary collapse

DEFAULT_HOST =
'localhost'
DEFAULT_PORT =
6379

Constants included from Logging

Logging::DEFAULT_LOG_LEVEL, Logging::LOG_LEVELS

Instance Attribute Summary

Attributes inherited from Base

#interval, #timeout

Attributes included from Logging

#log_level

Instance Method Summary collapse

Methods inherited from Base

#close_connection, #connect, #connection, #flush, #initialize

Methods included from Timeouts

#add_timeout, #remove_timeout

Methods included from Logging

#log

Constructor Details

This class inherits a constructor from Faye::Engine::Base

Instance Method Details

#client_exists(client_id, &callback) ⇒ Object



79
80
81
82
83
84
# File 'lib/faye/engines/redis.rb', line 79

def client_exists(client_id, &callback)
  init
  @redis.sismember(@ns + '/clients', client_id) do |exists|
    callback.call(exists != 0)
  end
end

#create_client(&callback) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/faye/engines/redis.rb', line 38

def create_client(&callback)
  init
  client_id = Faye.random
  @redis.sadd(@ns + '/clients', client_id) do |added|
    if added == 0
      create_client(&callback)
    else
      debug 'Created new client ?', client_id
      ping(client_id)
      callback.call(client_id)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/faye/engines/redis.rb', line 52

def destroy_client(client_id, &callback)
  init
  @redis.srem(@ns + '/clients', client_id)
  @redis.del(@ns + "/clients/#{client_id}/messages")
  
  remove_timeout(client_id)
  @redis.del(@ns + "/clients/#{client_id}/ping")
  
  @redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
    n, i = channels.size, 0
    if n == 0
      debug 'Destroyed client ?', client_id
      callback.call if callback
    else
      channels.each do |channel|
        unsubscribe(client_id, channel) do
          i += 1
          if i == n
            debug 'Destroyed client ?', client_id
            callback.call if callback
          end
        end
      end
    end
  end
end

#disconnectObject



34
35
36
# File 'lib/faye/engines/redis.rb', line 34

def disconnect
  @subscriber.unsubscribe(@ns + '/notifications')
end

#initObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/faye/engines/redis.rb', line 8

def init
  return if @redis
  require 'em-hiredis'
  
  host = @options[:host] || DEFAULT_HOST
  port = @options[:port] || DEFAULT_PORT
  db   = @options[:database] || 0
  auth = @options[:password]
  @ns  = @options[:namespace] || ''
  
  @redis      = EventMachine::Hiredis::Client.connect(host, port)
  @subscriber = EventMachine::Hiredis::Client.connect(host, port)
  
  if auth
    @redis.auth(auth)
    @subscriber.auth(auth)
  end
  @redis.select(db)
  @subscriber.select(db)
  
  @subscriber.subscribe(@ns + '/notifications')
  @subscriber.on(:message) do |topic, message|
    empty_queue(message) if topic == @ns + '/notifications'
  end
end

#ping(client_id) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/faye/engines/redis.rb', line 86

def ping(client_id)
  timeout = @options[:timeout]
  time    = Time.now.to_i.to_s
  
  return unless Numeric === timeout
  
  debug 'Ping ?, ?', client_id, timeout
  remove_timeout(client_id)
  @redis.set(@ns + "/clients/#{client_id}/ping", time)
  add_timeout(client_id, 2 * timeout) do
    @redis.get(@ns + "/clients/#{client_id}/ping") do |ping|
      destroy_client(client_id) if ping == time
    end
  end
end

#publish(message) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/faye/engines/redis.rb', line 120

def publish(message)
  init
  debug 'Publishing message ?', message
  json_message = JSON.dump(message)
  channels = Channel.expand(message['channel'])
  channels.each do |channel|
    @redis.smembers(@ns + "/channels#{channel}") do |clients|
      clients.each do |client_id|
        debug 'Queueing for client ?: ?', client_id, message
        @redis.sadd(@ns + "/clients/#{client_id}/messages", json_message)
        @redis.publish(@ns + '/notifications', client_id)
      end
    end
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object



102
103
104
105
106
107
108
109
# File 'lib/faye/engines/redis.rb', line 102

def subscribe(client_id, channel, &callback)
  init
  @redis.sadd(@ns + "/clients/#{client_id}/channels", channel)
  @redis.sadd(@ns + "/channels#{channel}", client_id) do
    debug 'Subscribed client ? to channel ?', client_id, channel
    callback.call if callback
  end
end

#unsubscribe(client_id, channel, &callback) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/faye/engines/redis.rb', line 111

def unsubscribe(client_id, channel, &callback)
  init
  @redis.srem(@ns + "/clients/#{client_id}/channels", channel)
  @redis.srem(@ns + "/channels#{channel}", client_id) do
    debug 'Unsubscribed client ? from channel ?', client_id, channel
    callback.call if callback
  end
end