Class: Firehose::Server::Channel
- Inherits:
-
Object
- Object
- Firehose::Server::Channel
- Defined in:
- lib/firehose/server/channel.rb
Overview
Connects to a specific channel on Redis and listens for messages to notify subscribers.
Instance Attribute Summary collapse
-
#channel_key ⇒ Object
readonly
Returns the value of attribute channel_key.
-
#list_key ⇒ Object
readonly
Returns the value of attribute list_key.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
-
#sequence_key ⇒ Object
readonly
Returns the value of attribute sequence_key.
-
#subscriber ⇒ Object
readonly
Returns the value of attribute subscriber.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(channel_key, redis = self.class.redis, subscriber = self.class.subscriber) ⇒ Channel
constructor
A new instance of Channel.
- #next_messages(consumer_sequence = nil, options = {}) ⇒ Object
- #unsubscribe(deferrable) ⇒ Object
Constructor Details
#initialize(channel_key, redis = self.class.redis, subscriber = self.class.subscriber) ⇒ Channel
Returns a new instance of Channel.
16 17 18 19 20 21 22 |
# File 'lib/firehose/server/channel.rb', line 16 def initialize(channel_key, redis=self.class.redis, subscriber=self.class.subscriber) @redis = redis @subscriber = subscriber @channel_key = channel_key @list_key = Server::Redis.key(channel_key, :list) @sequence_key = Server::Redis.key(channel_key, :sequence) end |
Instance Attribute Details
#channel_key ⇒ Object (readonly)
Returns the value of attribute channel_key.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def channel_key @channel_key end |
#list_key ⇒ Object (readonly)
Returns the value of attribute list_key.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def list_key @list_key end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
6 7 8 |
# File 'lib/firehose/server/channel.rb', line 6 def redis @redis end |
#sequence_key ⇒ Object (readonly)
Returns the value of attribute sequence_key.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def sequence_key @sequence_key end |
#subscriber ⇒ Object (readonly)
Returns the value of attribute subscriber.
6 7 8 |
# File 'lib/firehose/server/channel.rb', line 6 def subscriber @subscriber end |
Class Method Details
Instance Method Details
#next_messages(consumer_sequence = nil, options = {}) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/firehose/server/channel.rb', line 24 def (consumer_sequence=nil, ={}) deferrable = EM::DefaultDeferrable.new deferrable.errback {|e| EM.next_tick { raise e } unless [:timeout, :disconnect].include?(e) } redis.multi redis.get(sequence_key). errback {|e| deferrable.fail e } # Fetch entire list: http://stackoverflow.com/questions/10703019/redis-fetch-all-value-of-list-without-iteration-and-without-popping redis.lrange(list_key, 0, -1). errback {|e| deferrable.fail e } redis.exec.callback do |(channel_sequence, )| # Reverse the messages so they can be correctly procesed by the MessageBuffer class. There's # a patch in the message-buffer-redis branch that moves this concern into the Publisher LUA # script. We kept it out of this for now because it represents a deployment risk and `reverse!` # is a cheap operation in Ruby. .reverse! buffer = MessageBuffer.new(, channel_sequence, consumer_sequence) if buffer..empty? Firehose.logger.debug "No messages in buffer, subscribing. sequence: `#{channel_sequence}` consumer_sequence: #{consumer_sequence}" # Either this resource has never been seen before or we are all caught up. # Subscribe and hope something gets published to this end-point. subscribe(deferrable, [:timeout]) else # Either the client is under water or caught up to head. deferrable.succeed buffer. end end.errback {|e| deferrable.fail e } deferrable end |
#unsubscribe(deferrable) ⇒ Object
54 55 56 |
# File 'lib/firehose/server/channel.rb', line 54 def unsubscribe(deferrable) subscriber.unsubscribe channel_key, deferrable end |