Class: Firehose::Server::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/server/publisher.rb

Constant Summary collapse

TTL =

Seconds that the message buffer should live before Redis expires it.

60*60*24
PAYLOAD_DELIMITER =

Delimited used to frame different parts of a message that’s published over Firehose.

"\n"

Instance Method Summary collapse

Instance Method Details

#publish(channel_key, message, opts = {}) ⇒ Object

Publish a message to a Firehose channel via Redis.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/firehose/server/publisher.rb', line 12

def publish(channel_key, message, opts={})
  # How long should we hang on to the resource once is published?
  ttl = (opts[:ttl] || TTL).to_i
  buffer_size = (opts[:buffer_size] || MessageBuffer::DEFAULT_SIZE).to_i

  # TODO hi-redis isn't that awesome... we have to setup an errback per even for wrong
  # commands because of the lack of a method_missing whitelist. Perhaps implement a whitelist in
  # em-hiredis or us a diff lib?
  if (deferrable = opts[:deferrable]).nil?
    deferrable = EM::DefaultDeferrable.new
    deferrable.errback do |e|
      # Handle missing Lua publishing script in cache
      # (such as Redis restarting or someone executing SCRIPT FLUSH)
      if e.message =~ /NOSCRIPT/
        deferrable.succeed
        EM.next_tick do
          @publish_script_digest = nil
          combined_opts = opts.merge :deferrable => deferrable
          self.publish channel_key, message, combined_opts
        end
      else
        EM.next_tick { raise e }
      end
    end
  end

  if @publish_script_digest.nil?
    register_publish_script.errback do |e|
      deferrable.fail e
    end.callback do |digest|
      @publish_script_digest = digest
      Firehose.logger.debug "Registered Lua publishing script with Redis => #{digest}"
      eval_publish_script channel_key, message, ttl, buffer_size, deferrable
    end
  else
    eval_publish_script channel_key, message, ttl, buffer_size, deferrable
  end

  deferrable
end