Class: Firehose::Server::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/server/subscriber.rb

Overview

Setups a connetion to Redis to listen for new resources…

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis) ⇒ Subscriber

Returns a new instance of Subscriber.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/firehose/server/subscriber.rb', line 7

def initialize(redis)
  @pubsub = redis.pubsub
  # TODO: Instead of just raising an exception, it would probably be better
  #       for the errback to set some sort of 'disconnected' state. Then
  #       whenever a deferrable was 'subscribed' we could instantly fail
  #       the deferrable with whatever connection error we had.
  #       An alternative which would have a similar result would be to
  #       subscribe lazily (i.e. not until we have a deferrable to subscribe).
  #       Then, if connecting failed, it'd be super easy to fail the deferrable
  #       with the same error.
  #       The final goal is to allow the failed deferrable bubble back up
  #       so we can send back a nice, clean 500 error to the client.
  channel_updates_key = Server::Redis.key('channel_updates')
  pubsub.subscribe(channel_updates_key).
    errback{|e| EM.next_tick { raise e } }.
    callback { Firehose.logger.debug "Redis subscribed to `#{channel_updates_key}`" }
  pubsub.on(:message) do |_, payload|
    channel_key, channel_sequence, message = Server::Publisher.from_payload(payload)
    messages = [ MessageBuffer::Message.new(message, channel_sequence.to_i) ]
    if deferrables = subscriptions.delete(channel_key)
      Firehose.logger.debug "Redis notifying #{deferrables.count} deferrable(s) at `#{channel_key}` with channel_sequence `#{channel_sequence}` and message `#{message}`"
      deferrables.each do |deferrable|
        Firehose.logger.debug "Sending message #{message} and channel_sequence #{channel_sequence} to client from subscriber"
        deferrable.succeed messages
      end
    end
  end
end

Instance Attribute Details

#pubsubObject (readonly)

Returns the value of attribute pubsub.



5
6
7
# File 'lib/firehose/server/subscriber.rb', line 5

def pubsub
  @pubsub
end

Instance Method Details

#subscribe(channel_key, deferrable) ⇒ Object



36
37
38
# File 'lib/firehose/server/subscriber.rb', line 36

def subscribe(channel_key, deferrable)
  subscriptions[channel_key].push deferrable
end

#unsubscribe(channel_key, deferrable) ⇒ Object



40
41
42
43
# File 'lib/firehose/server/subscriber.rb', line 40

def unsubscribe(channel_key, deferrable)
  subscriptions[channel_key].delete deferrable
  subscriptions.delete(channel_key) if subscriptions[channel_key].empty?
end