Class: EventMachine::Hiredis::PubsubClient
- Inherits:
-
BaseClient
- Object
- BaseClient
- EventMachine::Hiredis::PubsubClient
- Defined in:
- lib/em-hiredis/pubsub_client.rb
Constant Summary collapse
- PUBSUB_MESSAGES =
%w{message pmessage subscribe unsubscribe psubscribe punsubscribe}.freeze
- PING_CHANNEL =
'__em-hiredis-ping'
Instance Attribute Summary
Attributes inherited from BaseClient
Instance Method Summary collapse
- #connect ⇒ Object
-
#initialize(host = 'localhost', port = '6379', password = nil, db = nil) ⇒ PubsubClient
constructor
A new instance of PubsubClient.
-
#ping ⇒ Object
Pubsub connections to not support even the PING command, but it is useful, especially with read-only connections like pubsub, to be able to check that the TCP connection is still usefully alive.
-
#psubscribe(pattern, proc = nil, &block) ⇒ Deferrable
Pattern subscribe to a pubsub channel.
-
#punsubscribe(pattern) ⇒ Deferrable
Pattern unsubscribe all callbacks for a given pattern.
-
#punsubscribe_proc(pattern, proc) ⇒ Deferrable
Unsubscribe a given callback from a pattern.
-
#subscribe(channel, proc = nil, &block) ⇒ Deferrable
Subscribe to a pubsub channel.
-
#unsubscribe(channel) ⇒ Deferrable
Unsubscribe all callbacks for a given channel.
-
#unsubscribe_proc(channel, proc) ⇒ Deferrable
Unsubscribe a given callback from a channel.
Methods inherited from BaseClient
#auth, #close_connection, #configure, #configure_inactivity_check, #connected?, #pending_commands?, #reconnect!, #reconnect_connection, #select
Methods included from EventEmitter
#emit, #listeners, #on, #remove_all_listeners, #remove_listener
Constructor Details
#initialize(host = 'localhost', port = '6379', password = nil, db = nil) ⇒ PubsubClient
Returns a new instance of PubsubClient.
7 8 9 10 11 |
# File 'lib/em-hiredis/pubsub_client.rb', line 7 def initialize(host='localhost', port='6379', password=nil, db=nil) @subs, @psubs = [], [] @pubsub_defs = Hash.new { |h,k| h[k] = [] } super end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class EventMachine::Hiredis::BaseClient
Instance Method Details
#connect ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/em-hiredis/pubsub_client.rb', line 13 def connect @sub_callbacks = Hash.new { |h, k| h[k] = [] } @psub_callbacks = Hash.new { |h, k| h[k] = [] } # Resubsubscribe to channels on reconnect on(:reconnected) { raw_send_command(:subscribe, @subs) if @subs.any? raw_send_command(:psubscribe, @psubs) if @psubs.any? } super end |
#ping ⇒ Object
Pubsub connections to not support even the PING command, but it is useful, especially with read-only connections like pubsub, to be able to check that the TCP connection is still usefully alive.
This is not particularly elegant, but it’s probably the best we can do for now. Ping support for pubsub connections is being considerred: github.com/antirez/redis/issues/420
138 139 140 141 142 |
# File 'lib/em-hiredis/pubsub_client.rb', line 138 def ping subscribe(PING_CHANNEL).callback { unsubscribe(PING_CHANNEL) } end |
#psubscribe(pattern, proc = nil, &block) ⇒ Deferrable
Pattern subscribe to a pubsub channel
If an optional proc / block is provided then it will be called (with the channel name and message) when a message is received on a matching channel
86 87 88 89 90 91 92 93 |
# File 'lib/em-hiredis/pubsub_client.rb', line 86 def psubscribe(pattern, proc = nil, &block) if cb = proc || block @psub_callbacks[pattern] << cb end @psubs << pattern raw_send_command(:psubscribe, [pattern]) return pubsub_deferrable(pattern) end |
#punsubscribe(pattern) ⇒ Deferrable
Pattern unsubscribe all callbacks for a given pattern
99 100 101 102 103 104 |
# File 'lib/em-hiredis/pubsub_client.rb', line 99 def punsubscribe(pattern) @psub_callbacks.delete(pattern) @psubs.delete(pattern) raw_send_command(:punsubscribe, [pattern]) return pubsub_deferrable(pattern) end |
#punsubscribe_proc(pattern, proc) ⇒ Deferrable
Unsubscribe a given callback from a pattern. Will unsubscribe from redis if there are no remaining subscriptions on this pattern
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/em-hiredis/pubsub_client.rb', line 114 def punsubscribe_proc(pattern, proc) df = EM::DefaultDeferrable.new if @psub_callbacks[pattern].delete(proc) if @psub_callbacks[pattern].any? # Succeed deferrable immediately - no need to punsubscribe df.succeed else punsubscribe(pattern).callback { |_| df.succeed } end else df.fail end return df end |
#subscribe(channel, proc = nil, &block) ⇒ Deferrable
Subscribe to a pubsub channel
If an optional proc / block is provided then it will be called when a message is received on this channel
33 34 35 36 37 38 39 40 |
# File 'lib/em-hiredis/pubsub_client.rb', line 33 def subscribe(channel, proc = nil, &block) if cb = proc || block @sub_callbacks[channel] << cb end @subs << channel raw_send_command(:subscribe, [channel]) return pubsub_deferrable(channel) end |
#unsubscribe(channel) ⇒ Deferrable
Unsubscribe all callbacks for a given channel
46 47 48 49 50 51 |
# File 'lib/em-hiredis/pubsub_client.rb', line 46 def unsubscribe(channel) @sub_callbacks.delete(channel) @subs.delete(channel) raw_send_command(:unsubscribe, [channel]) return pubsub_deferrable(channel) end |
#unsubscribe_proc(channel, proc) ⇒ Deferrable
Unsubscribe a given callback from a channel. Will unsubscribe from redis if there are no remaining subscriptions on this channel
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/em-hiredis/pubsub_client.rb', line 61 def unsubscribe_proc(channel, proc) df = EM::DefaultDeferrable.new if @sub_callbacks[channel].delete(proc) if @sub_callbacks[channel].any? # Succeed deferrable immediately - no need to unsubscribe df.succeed else unsubscribe(channel).callback { |_| df.succeed } end else df.fail end return df end |