Class: NSQ::Reader
- Inherits:
-
Object
- Object
- NSQ::Reader
- Defined in:
- lib/nsq/reader.rb
Overview
Maintains a collection of subscribers to topics and channels.
Instance Attribute Summary collapse
-
#long_id ⇒ Object
readonly
Returns the value of attribute long_id.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#selector ⇒ Object
readonly
Returns the value of attribute selector.
-
#short_id ⇒ Object
readonly
Returns the value of attribute short_id.
Instance Method Summary collapse
-
#add_timeout(interval, &block) ⇒ Object
Call the given block from within the #run thread when the given interval has passed.
-
#initialize(options = {}) ⇒ Reader
constructor
Create a new NSQ Reader.
-
#run ⇒ Object
Processes all the messages from the subscribed connections.
-
#stop ⇒ Object
Stop this reader which will gracefully exit the run method after all current messages are processed.
-
#subscribe(topic, channel, options = {}, &block) ⇒ Object
Subscribes to a given topic and channel.
-
#to_s ⇒ Object
:nodoc:.
-
#unsubscribe(topic, channel) ⇒ Object
Unsubscribe a given topic and channel.
Constructor Details
#initialize(options = {}) ⇒ Reader
Create a new NSQ Reader
Options (Refer to NSQ::Subscriber::new for additional options which will be passed on to each subscriber):
:nsqd_tcp_addresses [String or Array of Strings]
Array of nsqd servers to connect to with port numbers
['server1:4150', 'server2:4150']
:lookupd_tcp_addresses [String or Array of Strings] (Not implemented)
Array of nsq_lookupd servers to connect to with port numbers
['server1:4160', 'server2:4160']
:lookupd_poll_interval [Float] (Not implemented)
How often to poll the lookupd_tcp_addresses for new nsqd servers
Default: 120
:long_id [String]
The identifier used as a long-form descriptor
Default: fully-qualified hostname
:short_id [String]
The identifier used as a short-form descriptor
Default: short hostname
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/nsq/reader.rb', line 34 def initialize(={}) @options = @nsqd_tcp_addresses = s_to_a([:nsqd_tcp_addresses]) @lookupd_tcp_addresses = s_to_a([:lookupd_tcp_addresses]) @lookupd_poll_interval = [:lookupd_poll_interval] || 120 @long_id = [:long_id] || Socket.gethostname @short_id = [:short_id] || @long_id.split('.')[0] NSQ.logger = [:logger] if [:logger] NSQ.logger.level = [:logger_level] if [:logger_level] @selector = ::NIO::Selector.new @timer = Timer.new(@selector) @topic_count = Hash.new(0) @subscribers = {} @subscriber_mutex = Monitor.new @name = "#{@long_id}:#{@short_id}" raise 'Must pass either option :nsqd_tcp_addresses or :lookupd_http_addresses' if @nsqd_tcp_addresses.empty? && @lookupd_http_addresses.empty? @conns = {} @last_lookup = nil @logger.info("starting reader for topic '%s'..." % self.topic) if @logger end |
Instance Attribute Details
#long_id ⇒ Object (readonly)
Returns the value of attribute long_id.
9 10 11 |
# File 'lib/nsq/reader.rb', line 9 def long_id @long_id end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/nsq/reader.rb', line 9 def name @name end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
9 10 11 |
# File 'lib/nsq/reader.rb', line 9 def @options end |
#selector ⇒ Object (readonly)
Returns the value of attribute selector.
9 10 11 |
# File 'lib/nsq/reader.rb', line 9 def selector @selector end |
#short_id ⇒ Object (readonly)
Returns the value of attribute short_id.
9 10 11 |
# File 'lib/nsq/reader.rb', line 9 def short_id @short_id end |
Instance Method Details
#add_timeout(interval, &block) ⇒ Object
Call the given block from within the #run thread when the given interval has passed.
120 121 122 |
# File 'lib/nsq/reader.rb', line 120 def add_timeout(interval, &block) @timer.add(interval, &block) end |
#run ⇒ Object
Processes all the messages from the subscribed connections. This will not return until #stop has been called in a separate thread.
99 100 101 102 103 104 105 106 107 |
# File 'lib/nsq/reader.rb', line 99 def run @stopped = false until @stopped do if (Time.now.to_i - @last_lookup.to_i) > @lookupd_poll_interval # Do lookupd end @selector.select(@timer.next_interval) { |m| m.value.call } end end |
#stop ⇒ Object
Stop this reader which will gracefully exit the run method after all current messages are processed.
110 111 112 113 114 115 116 117 |
# File 'lib/nsq/reader.rb', line 110 def stop NSQ.logger.info("#{self}: Reader stopping...") @stopped = true @selector.wakeup @subscriber_mutex.synchronize do @subscribers.each_value {|subscriber| subscriber.stop} end end |
#subscribe(topic, channel, options = {}, &block) ⇒ Object
Subscribes to a given topic and channel.
If a block is passed, then within NSQ::Reader#run that block will be run synchronously whenever a message is received for this channel.
If a block is not passed, then the QueueSubscriber that is returned from this method should have it’s QueueSubscriber#run method executed within one or more separate threads for processing the messages.
Refer to Subscriber::new for the options that can be passed to this method.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nsq/reader.rb', line 69 def subscribe(topic, channel, ={}, &block) Util.assert_topic_and_channel_valid(topic, channel) subscriber = nil name = "#{topic}:#{channel}" @subscriber_mutex.synchronize do raise "Already subscribed to #{name}" if @subscribers[name] subscriber_class = block_given? ? Subscriber : QueueSubscriber subscriber = @subscribers[name] = subscriber_class.new(self, topic, channel, , &block) end @nsqd_tcp_addresses.each do |addr| address, port = addr.split(':') subscriber.add_connection(address, port.to_i) end subscriber end |
#to_s ⇒ Object
:nodoc:
124 125 126 |
# File 'lib/nsq/reader.rb', line 124 def to_s #:nodoc: @name end |
#unsubscribe(topic, channel) ⇒ Object
Unsubscribe a given topic and channel.
87 88 89 90 91 92 93 94 95 |
# File 'lib/nsq/reader.rb', line 87 def unsubscribe(topic, channel) name = "#{topic}:#{channel}" @subscriber_mutex.synchronize do subscriber = @subscribers[name] return unless subscriber subscriber.stop @subscribers.delete(name) end end |