Class: Firehose::Server::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/server/channel.rb

Overview

Connects to a specific channel on Redis and listens for messages to notify subscribers.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel_key, redis = self.class.redis, subscriber = self.class.subscriber) ⇒ Channel

Returns a new instance of Channel.



16
17
18
19
20
21
22
# File 'lib/firehose/server/channel.rb', line 16

def initialize(channel_key, redis=self.class.redis, subscriber=self.class.subscriber)
  @redis        = redis
  @subscriber   = subscriber
  @channel_key  = channel_key
  @list_key     = Server::Redis.key(channel_key, :list)
  @sequence_key = Server::Redis.key(channel_key, :sequence)
end

Instance Attribute Details

#channel_keyObject (readonly)

Returns the value of attribute channel_key.



5
6
7
# File 'lib/firehose/server/channel.rb', line 5

def channel_key
  @channel_key
end

#list_keyObject (readonly)

Returns the value of attribute list_key.



5
6
7
# File 'lib/firehose/server/channel.rb', line 5

def list_key
  @list_key
end

#redisObject (readonly)

Returns the value of attribute redis.



6
7
8
# File 'lib/firehose/server/channel.rb', line 6

def redis
  @redis
end

#sequence_keyObject (readonly)

Returns the value of attribute sequence_key.



5
6
7
# File 'lib/firehose/server/channel.rb', line 5

def sequence_key
  @sequence_key
end

#subscriberObject (readonly)

Returns the value of attribute subscriber.



6
7
8
# File 'lib/firehose/server/channel.rb', line 6

def subscriber
  @subscriber
end

Class Method Details

.redisObject



8
9
10
# File 'lib/firehose/server/channel.rb', line 8

def self.redis
  @redis ||= Firehose::Server.redis.connection
end

.subscriberObject



12
13
14
# File 'lib/firehose/server/channel.rb', line 12

def self.subscriber
  @subscriber ||= Server::Subscriber.new(Firehose::Server.redis.connection)
end

Instance Method Details

#next_messages(consumer_sequence = nil, options = {}) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/firehose/server/channel.rb', line 24

def next_messages(consumer_sequence=nil, options={})
  deferrable = EM::DefaultDeferrable.new
  deferrable.errback {|e| EM.next_tick { raise e } unless [:timeout, :disconnect].include?(e) }

  redis.multi
    redis.get(sequence_key).
      errback {|e| deferrable.fail e }
    # Fetch entire list: http://stackoverflow.com/questions/10703019/redis-fetch-all-value-of-list-without-iteration-and-without-popping
    redis.lrange(list_key, 0, -1).
      errback {|e| deferrable.fail e }
  redis.exec.callback do |(channel_sequence, message_list)|
    # Reverse the messages so they can be correctly procesed by the MessageBuffer class. There's
    # a patch in the message-buffer-redis branch that moves this concern into the Publisher LUA
    # script. We kept it out of this for now because it represents a deployment risk and `reverse!`
    # is a cheap operation in Ruby.
    message_list.reverse!
    buffer = MessageBuffer.new(message_list, channel_sequence, consumer_sequence)
    if buffer.remaining_messages.empty?
      Firehose.logger.debug "No messages in buffer, subscribing. sequence: `#{channel_sequence}` consumer_sequence: #{consumer_sequence}"
      # Either this resource has never been seen before or we are all caught up.
      # Subscribe and hope something gets published to this end-point.
      subscribe(deferrable, options[:timeout])
    else # Either the client is under water or caught up to head.
      deferrable.succeed buffer.remaining_messages
    end
  end.errback {|e| deferrable.fail e }

  deferrable
end

#unsubscribe(deferrable) ⇒ Object



54
55
56
# File 'lib/firehose/server/channel.rb', line 54

def unsubscribe(deferrable)
  subscriber.unsubscribe channel_key, deferrable
end