Class: Firehose::Server::Subscriber
- Inherits:
-
Object
- Object
- Firehose::Server::Subscriber
- Defined in:
- lib/firehose/server/subscriber.rb
Overview
Setups a connetion to Redis to listen for new resources…
Instance Attribute Summary collapse
-
#pubsub ⇒ Object
readonly
Returns the value of attribute pubsub.
Instance Method Summary collapse
-
#initialize(redis) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #subscribe(channel_key, deferrable) ⇒ Object
- #unsubscribe(channel_key, deferrable) ⇒ Object
Constructor Details
#initialize(redis) ⇒ Subscriber
Returns a new instance of Subscriber.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/firehose/server/subscriber.rb', line 7 def initialize(redis) @pubsub = redis.pubsub # TODO: Instead of just raising an exception, it would probably be better # for the errback to set some sort of 'disconnected' state. Then # whenever a deferrable was 'subscribed' we could instantly fail # the deferrable with whatever connection error we had. # An alternative which would have a similar result would be to # subscribe lazily (i.e. not until we have a deferrable to subscribe). # Then, if connecting failed, it'd be super easy to fail the deferrable # with the same error. # The final goal is to allow the failed deferrable bubble back up # so we can send back a nice, clean 500 error to the client. channel_updates_key = Server::Redis.key('channel_updates') pubsub.subscribe(channel_updates_key). errback{|e| EM.next_tick { raise e } }. callback { Firehose.logger.debug "Redis subscribed to `#{channel_updates_key}`" } pubsub.on(:message) do |_, payload| channel_key, channel_sequence, = Server::Publisher.from_payload(payload) = [ MessageBuffer::Message.new(, channel_sequence.to_i) ] if deferrables = subscriptions.delete(channel_key) Firehose.logger.debug "Redis notifying #{deferrables.count} deferrable(s) at `#{channel_key}` with channel_sequence `#{channel_sequence}` and message `#{}`" deferrables.each do |deferrable| Firehose.logger.debug "Sending message #{} and channel_sequence #{channel_sequence} to client from subscriber" deferrable.succeed end end end end |
Instance Attribute Details
#pubsub ⇒ Object (readonly)
Returns the value of attribute pubsub.
5 6 7 |
# File 'lib/firehose/server/subscriber.rb', line 5 def pubsub @pubsub end |
Instance Method Details
#subscribe(channel_key, deferrable) ⇒ Object
36 37 38 |
# File 'lib/firehose/server/subscriber.rb', line 36 def subscribe(channel_key, deferrable) subscriptions[channel_key].push deferrable end |
#unsubscribe(channel_key, deferrable) ⇒ Object
40 41 42 43 |
# File 'lib/firehose/server/subscriber.rb', line 40 def unsubscribe(channel_key, deferrable) subscriptions[channel_key].delete deferrable subscriptions.delete(channel_key) if subscriptions[channel_key].empty? end |