Class: EventMachine::Hiredis::PubsubClient
- Inherits:
-
BaseClient
- Object
- BaseClient
- EventMachine::Hiredis::PubsubClient
- Defined in:
- lib/em-hiredis/pubsub_client.rb
Constant Summary collapse
- PUBSUB_MESSAGES =
%w{message pmessage subscribe unsubscribe psubscribe punsubscribe}.freeze
Instance Attribute Summary
Attributes inherited from BaseClient
Instance Method Summary collapse
- #connect ⇒ Object
-
#initialize(host = 'localhost', port = '6379', password = nil, db = nil) ⇒ PubsubClient
constructor
A new instance of PubsubClient.
-
#psubscribe(pattern, proc = nil, &block) ⇒ Deferrable
Pattern subscribe to a pubsub channel.
-
#punsubscribe(pattern) ⇒ Deferrable
Pattern unsubscribe all callbacks for a given pattern.
-
#punsubscribe_proc(pattern, proc) ⇒ Deferrable
Unsubscribe a given callback from a pattern.
-
#subscribe(channel, proc = nil, &block) ⇒ Deferrable
Subscribe to a pubsub channel.
-
#unsubscribe(channel) ⇒ Deferrable
Unsubscribe all callbacks for a given channel.
-
#unsubscribe_proc(channel, proc) ⇒ Deferrable
Unsubscribe a given callback from a channel.
Methods inherited from BaseClient
#auth, #close_connection, #configure, #connected?, #pending_commands?, #reconnect_connection, #select
Methods included from EventEmitter
#emit, #listeners, #on, #remove_all_listeners, #remove_listener
Constructor Details
#initialize(host = 'localhost', port = '6379', password = nil, db = nil) ⇒ PubsubClient
Returns a new instance of PubsubClient.
5 6 7 8 9 |
# File 'lib/em-hiredis/pubsub_client.rb', line 5 def initialize(host='localhost', port='6379', password=nil, db=nil) @subs, @psubs = [], [] @pubsub_defs = Hash.new { |h,k| h[k] = [] } super end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class EventMachine::Hiredis::BaseClient
Instance Method Details
#connect ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/em-hiredis/pubsub_client.rb', line 11 def connect @sub_callbacks = Hash.new { |h, k| h[k] = [] } @psub_callbacks = Hash.new { |h, k| h[k] = [] } # Resubsubscribe to channels on reconnect on(:reconnected) { raw_send_command(:subscribe, *@subs) if @subs.any? raw_send_command(:psubscribe, *@psubs) if @psubs.any? } super end |
#psubscribe(pattern, proc = nil, &block) ⇒ Deferrable
Pattern subscribe to a pubsub channel
If an optional proc / block is provided then it will be called (with the channel name and message) when a message is received on a matching channel
84 85 86 87 88 89 90 91 |
# File 'lib/em-hiredis/pubsub_client.rb', line 84 def psubscribe(pattern, proc = nil, &block) if cb = proc || block @psub_callbacks[pattern] << cb end @psubs << pattern raw_send_command(:psubscribe, pattern) return pubsub_deferrable(pattern) end |
#punsubscribe(pattern) ⇒ Deferrable
Pattern unsubscribe all callbacks for a given pattern
97 98 99 100 101 102 |
# File 'lib/em-hiredis/pubsub_client.rb', line 97 def punsubscribe(pattern) @psub_callbacks.delete(pattern) @psubs.delete(pattern) raw_send_command(:punsubscribe, pattern) return pubsub_deferrable(pattern) end |
#punsubscribe_proc(pattern, proc) ⇒ Deferrable
Unsubscribe a given callback from a pattern. Will unsubscribe from redis if there are no remaining subscriptions on this pattern
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/em-hiredis/pubsub_client.rb', line 112 def punsubscribe_proc(pattern, proc) df = EM::DefaultDeferrable.new if @psub_callbacks[pattern].delete(proc) if @psub_callbacks[pattern].any? # Succeed deferrable immediately - no need to punsubscribe df.succeed else punsubscribe(pattern).callback { |_| df.succeed } end else df.fail end return df end |
#subscribe(channel, proc = nil, &block) ⇒ Deferrable
Subscribe to a pubsub channel
If an optional proc / block is provided then it will be called when a message is received on this channel
31 32 33 34 35 36 37 38 |
# File 'lib/em-hiredis/pubsub_client.rb', line 31 def subscribe(channel, proc = nil, &block) if cb = proc || block @sub_callbacks[channel] << cb end @subs << channel raw_send_command(:subscribe, channel) return pubsub_deferrable(channel) end |
#unsubscribe(channel) ⇒ Deferrable
Unsubscribe all callbacks for a given channel
44 45 46 47 48 49 |
# File 'lib/em-hiredis/pubsub_client.rb', line 44 def unsubscribe(channel) @sub_callbacks.delete(channel) @subs.delete(channel) raw_send_command(:unsubscribe, channel) return pubsub_deferrable(channel) end |
#unsubscribe_proc(channel, proc) ⇒ Deferrable
Unsubscribe a given callback from a channel. Will unsubscribe from redis if there are no remaining subscriptions on this channel
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/em-hiredis/pubsub_client.rb', line 59 def unsubscribe_proc(channel, proc) df = EM::DefaultDeferrable.new if @sub_callbacks[channel].delete(proc) if @sub_callbacks[channel].any? # Succeed deferrable immediately - no need to unsubscribe df.succeed else unsubscribe(channel).callback { |_| df.succeed } end else df.fail end return df end |