Class: Firehose::Server::Publisher
- Inherits:
-
Object
- Object
- Firehose::Server::Publisher
- Defined in:
- lib/firehose/server/publisher.rb
Constant Summary collapse
- TTL =
Seconds that the message buffer should live before Redis expires it.
60*60*24
- PAYLOAD_DELIMITER =
Delimited used to frame different parts of a message that’s published over Firehose.
"\n"
Instance Method Summary collapse
-
#publish(channel_key, message, opts = {}) ⇒ Object
Publish a message to a Firehose channel via Redis.
Instance Method Details
#publish(channel_key, message, opts = {}) ⇒ Object
Publish a message to a Firehose channel via Redis.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/firehose/server/publisher.rb', line 12 def publish(channel_key, , opts={}) # How long should we hang on to the resource once is published? ttl = (opts[:ttl] || TTL).to_i buffer_size = (opts[:buffer_size] || MessageBuffer::DEFAULT_SIZE).to_i # TODO hi-redis isn't that awesome... we have to setup an errback per even for wrong # commands because of the lack of a method_missing whitelist. Perhaps implement a whitelist in # em-hiredis or us a diff lib? if (deferrable = opts[:deferrable]).nil? deferrable = EM::DefaultDeferrable.new deferrable.errback do |e| # Handle missing Lua publishing script in cache # (such as Redis restarting or someone executing SCRIPT FLUSH) if e. =~ /NOSCRIPT/ deferrable.succeed EM.next_tick do @publish_script_digest = nil combined_opts = opts.merge :deferrable => deferrable self.publish channel_key, , combined_opts end else EM.next_tick { raise e } end end end if @publish_script_digest.nil? register_publish_script.errback do |e| deferrable.fail e end.callback do |digest| @publish_script_digest = digest Firehose.logger.debug "Registered Lua publishing script with Redis => #{digest}" eval_publish_script channel_key, , ttl, buffer_size, deferrable end else eval_publish_script channel_key, , ttl, buffer_size, deferrable end deferrable end |