Class: MQ
- Inherits:
-
Object
- Object
- MQ
- Includes:
- AMQP, EM::Deferrable
- Defined in:
- lib/mq.rb,
lib/mq.rb,
lib/mq.rb,
lib/mq.rb,
lib/mq/rpc.rb,
lib/mq/queue.rb,
lib/mq/header.rb,
lib/mq/logger.rb,
lib/mq/exchange.rb
Overview
– convenience wrapper (read: HACK) for thread-local MQ object
Defined Under Namespace
Classes: Error, Exchange, Header, Logger, Queue, RPC
Constant Summary
Constants included from AMQP
AMQP::FIELDS, AMQP::HEADER, AMQP::PORT, AMQP::RESPONSES, AMQP::VERSION, AMQP::VERSION_MAJOR, AMQP::VERSION_MINOR
Class Attribute Summary collapse
-
.logging ⇒ Object
Returns the value of attribute logging.
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
Class Method Summary collapse
-
.basic_return(data = nil, &blk) ⇒ Object
Define a data hashmap and callback block to be executed on all basic.return messages received.
- .default ⇒ Object
-
.error(msg = nil, &blk) ⇒ Object
Define a message and callback block to be executed on all errors.
-
.id ⇒ Object
unique identifier.
-
.method_missing(meth, *args, &blk) ⇒ Object
Allows for calls to all MQ instance methods.
Instance Method Summary collapse
- #close ⇒ Object
-
#consumers ⇒ Object
Queue objects keyed on their consumer tags.
-
#direct(name = 'amq.direct', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
-
#exchanges ⇒ Object
Returns a hash of all the exchange proxy objects.
-
#fanout(name = 'amq.fanout', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
- #get_queue ⇒ Object
-
#headers(name = 'amq.match', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
-
#initialize(connection = nil) ⇒ MQ
constructor
Returns a new channel.
- #prefetch(size) ⇒ Object
-
#process_frame(frame) ⇒ Object
May raise a MQ::Error exception when the frame payload contains a Protocol::Channel::Close object.
-
#queue(name, opts = {}) ⇒ Object
Queues store and forward messages.
-
#queues ⇒ Object
Returns a hash of all the queue proxy objects.
-
#recover(requeue = false) ⇒ Object
Asks the broker to redeliver all unacknowledged messages on this channel.
- #reset ⇒ Object
-
#rpc(name, obj = nil) ⇒ Object
Takes a channel, queue and optional object.
-
#rpcs ⇒ Object
Returns a hash of all rpc proxy objects.
- #send(*args) ⇒ Object
-
#topic(name = 'amq.topic', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
Methods included from AMQP
client, client=, connect, fork, settings, start, stop, vstart
Constructor Details
#initialize(connection = nil) ⇒ MQ
Returns a new channel. A channel is a bidirectional virtual connection between the client and the AMQP server. Elsewhere in the library the channel is referred to in parameter lists as mq
.
Optionally takes the result from calling AMQP::connect.
Rarely called directly by client code. This is implicitly called by most instance methods. See #method_missing.
EM.run do
channel = MQ.new
end
EM.run do
channel = MQ.new AMQP::connect
end
138 139 140 141 142 143 144 145 146 147 |
# File 'lib/mq.rb', line 138 def initialize connection = nil raise 'MQ can only be used from within EM.run{}' unless EM.reactor_running? @connection = connection || AMQP.start conn.callback{ |c| @channel = c.add_channel(self) send Protocol::Channel::Open.new } end |
Class Attribute Details
.logging ⇒ Object
Returns the value of attribute logging.
14 15 16 |
# File 'lib/mq.rb', line 14 def logging @logging end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
148 149 150 |
# File 'lib/mq.rb', line 148 def channel @channel end |
Class Method Details
.basic_return(data = nil, &blk) ⇒ Object
Define a data hashmap and callback block to be executed on all basic.return messages received. This is the same as the ReturnListener in the official RabbitMQ Java API but is not scoped to a channel. Data is a hashmap equivalent to the Java listeners method signature plus the channel number.
783 784 785 786 787 788 789 |
# File 'lib/mq.rb', line 783 def self.basic_return data = nil, &blk if blk @return_callback = blk else @return_callback.call(data) if @return_callback and data end end |
.default ⇒ Object
860 861 862 863 |
# File 'lib/mq.rb', line 860 def MQ.default #-- XXX clear this when connection is closed Thread.current[:mq] ||= MQ.new end |
.error(msg = nil, &blk) ⇒ Object
Define a message and callback block to be executed on all errors.
751 752 753 754 755 756 757 |
# File 'lib/mq.rb', line 751 def self.error msg = nil, &blk if blk @error_callback = blk else @error_callback.call(msg) if @error_callback and msg end end |
.id ⇒ Object
unique identifier
874 875 876 |
# File 'lib/mq.rb', line 874 def MQ.id Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" end |
.method_missing(meth, *args, &blk) ⇒ Object
Allows for calls to all MQ instance methods. This implicitly calls MQ.new so that a new channel is allocated for subsequent operations.
867 868 869 |
# File 'lib/mq.rb', line 867 def MQ.method_missing meth, *args, &blk MQ.default.__send__(meth, *args, &blk) end |
Instance Method Details
#close ⇒ Object
738 739 740 741 742 743 744 745 746 747 |
# File 'lib/mq.rb', line 738 def close if @deferred_status == :succeeded send Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0) else @closing = true end end |
#consumers ⇒ Object
Queue objects keyed on their consumer tags.
Not typically called by client code.
823 824 825 |
# File 'lib/mq.rb', line 823 def consumers @consumers ||= {} end |
#direct(name = 'amq.direct', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
Direct
A direct exchange is useful for 1:1 communication between a publisher and subscriber. Messages are routed to the queue with a binding that shares the same name as the exchange. Alternately, the messages are routed to the bound queue that shares the same name as the routing key used for defining the exchange. This exchange type does not honor the :key
option when defining a new instance with a name. It will honor the :key
option if the exchange name is the empty string. Allocating this exchange without a name or with the empty string will use the internal ‘amq.direct’ exchange.
Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.
# exchange is named 'foo'
exchange = MQ.direct('foo')
# or, the exchange can use the default name (amq.direct) and perform
# routing comparisons using the :key
exchange = MQ.direct("", :key => 'foo')
exchange.publish('some data') # will be delivered to queue bound to 'foo'
queue = MQ.queue('foo')
# can receive data since the queue name and the exchange key match exactly
queue.pop { |data| puts "received data [#{data}]" }
Options
-
:passive => true | false (default false)
If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.
-
:durable => true | false (default false)
If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.
Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.
-
:auto_delete => true | false (default false)
If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.
If the exchange has been previously declared, this option is ignored on subsequent declarations.
-
:internal => true | false (default false)
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
-
:nowait => true | false (default true)
If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
Exceptions
Doing any of these activities are illegal and will raise MQ:Error.
-
redeclare an already-declared exchange to a different type
-
:passive => true and the exchange does not exist (NOT_FOUND)
346 347 348 |
# File 'lib/mq.rb', line 346 def direct name = 'amq.direct', opts = {} exchanges[name] ||= Exchange.new(self, :direct, name, opts) end |
#exchanges ⇒ Object
Returns a hash of all the exchange proxy objects.
Not typically called by client code.
794 795 796 |
# File 'lib/mq.rb', line 794 def exchanges @exchanges ||= {} end |
#fanout(name = 'amq.fanout', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
Fanout
A fanout exchange is useful for 1:N communication where one publisher feeds multiple subscribers. Like direct exchanges, messages published to a fanout exchange are delivered to queues whose name matches the exchange name (or are bound to that exchange name). Each queue gets its own copy of the message.
Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.
Like the direct exchange type, this exchange type does not honor the :key
option when defining a new instance with a name. It will honor the :key
option if the exchange name is the empty string. Allocating this exchange without a name or with the empty string will use the internal ‘amq.fanout’ exchange.
EM.run do
clock = MQ.fanout('clock')
EM.add_periodic_timer(1) do
puts "\npublishing #{time = Time.now}"
clock.publish(Marshal.dump(time))
end
amq = MQ.queue('every second')
amq.bind(MQ.fanout('clock')).subscribe do |time|
puts "every second received #{Marshal.load(time)}"
end
# note the string passed to #bind
MQ.queue('every 5 seconds').bind('clock').subscribe do |time|
time = Marshal.load(time)
puts "every 5 seconds received #{time}" if time.strftime('%S').to_i%5 == 0
end
end
Options
-
:passive => true | false (default false)
If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.
-
:durable => true | false (default false)
If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.
Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.
-
:auto_delete => true | false (default false)
If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.
If the exchange has been previously declared, this option is ignored on subsequent declarations.
-
:internal => true | false (default false)
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
-
:nowait => true | false (default true)
If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
Exceptions
Doing any of these activities are illegal and will raise MQ:Error.
-
redeclare an already-declared exchange to a different type
-
:passive => true and the exchange does not exist (NOT_FOUND)
432 433 434 |
# File 'lib/mq.rb', line 432 def fanout name = 'amq.fanout', opts = {} exchanges[name] ||= Exchange.new(self, :fanout, name, opts) end |
#get_queue ⇒ Object
805 806 807 808 809 810 811 |
# File 'lib/mq.rb', line 805 def get_queue if block_given? (@get_queue_mutex ||= Mutex.new).synchronize{ yield( @get_queue ||= [] ) } end end |
#headers(name = 'amq.match', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
Headers
A headers exchange allows for messages to be published to an exchange
Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.
As part of the AMQP standard, each server should predeclare a headers exchange called ‘amq.match’ (this is not required by the standard). Allocating this exchange without a name or with the empty string will use the internal ‘amq.match’ exchange.
TODO: The classic example is …
When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified (in RabbitMQ at least), it defaults to “all”.
A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).
TODO: document behavior when either the binding or the message is missing
a header present in the other
TODO: insert example
Options
-
:passive => true | false (default false)
If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.
-
:durable => true | false (default false)
If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.
Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.
-
:auto_delete => true | false (default false)
If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.
If the exchange has been previously declared, this option is ignored on subsequent declarations.
-
:internal => true | false (default false)
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
-
:nowait => true | false (default true)
If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
Exceptions
Doing any of these activities are illegal and will raise MQ:Error.
-
redeclare an already-declared exchange to a different type
-
:passive => true and the exchange does not exist (NOT_FOUND)
-
using a value other than “any” or “all” for “x-match”
624 625 626 |
# File 'lib/mq.rb', line 624 def headers name = 'amq.match', opts = {} exchanges[name] ||= Exchange.new(self, :headers, name, opts) end |
#prefetch(size) ⇒ Object
759 760 761 762 763 |
# File 'lib/mq.rb', line 759 def prefetch(size) @prefetch_size = size send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false) self end |
#process_frame(frame) ⇒ Object
May raise a MQ::Error exception when the frame payload contains a Protocol::Channel::Close object.
This usually occurs when a client attempts to perform an illegal operation. A short, and incomplete, list of potential illegal operations follows:
-
publish a message to a deleted exchange (NOT_FOUND)
-
declare an exchange using the reserved ‘amq.’ naming structure (ACCESS_REFUSED)
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/mq.rb', line 159 def process_frame frame log :received, frame case frame when Frame::Header @header = frame.payload @body = '' when Frame::Body @body << frame.payload if @body.length >= @header.size @header.properties.update(@method.arguments) @consumer.receive @header, @body if @consumer # call the return listener on basic.return # when doing this in the segment assembly below we lack header and body, # when doing this after the segment assembly Frame::Body will have cleared # the body if @method.is_a?(Protocol::Basic::Return) MQ.basic_return :channel => channel, :reply_code => @method.reply_code, :reply_text => @method.reply_text, :exchange => @method.exchange, :routing_key => @method.routing_key, :properties => @header.properties, :body => @body end @body = @header = @consumer = @method = nil end when Frame::Method case method = frame.payload when Protocol::Channel::OpenOk send Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, :active => true, :passive => true) when Protocol::Access::RequestOk @ticket = method.ticket callback{ send Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0) } if @closing succeed when Protocol::Basic::CancelOk if @consumer = consumers[ method.consumer_tag ] @consumer.cancelled else MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" end when Protocol::Queue::DeclareOk queues[ method.queue ].receive_status method when Protocol::Basic::Deliver, Protocol::Basic::GetOk @method = method @header = nil @body = '' if method.is_a? Protocol::Basic::GetOk @consumer = get_queue{|q| q.shift } MQ.error "No pending Basic.GetOk requests" unless @consumer else @consumer = consumers[ method.consumer_tag ] MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer end when Protocol::Basic::GetEmpty if @consumer = get_queue{|q| q.shift } @consumer.receive nil, nil else MQ.error "Basic.GetEmpty for invalid consumer" end when Protocol::Basic::Return @method = method when Protocol::Channel::Close raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" when Protocol::Channel::CloseOk @closing = false conn.callback{ |c| c.channels.delete @channel c.close if c.channels.empty? } when Protocol::Basic::ConsumeOk if @consumer = consumers[ method.consumer_tag ] @consumer.confirm_subscribe else MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}" end end end end |
#queue(name, opts = {}) ⇒ Object
Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.
Like an Exchange, queue names starting with ‘amq.’ are reserved for internal use. Attempts to create queue names in violation of this reservation will raise MQ:Error (ACCESS_REFUSED).
It is not supported to create a queue without a name; some string (even the empty string) must be passed in the name
parameter.
Options
-
:passive => true | false (default false)
If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.
-
:durable => true | false (default false)
If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue (though it is allowed).
Again, note the durability property on a queue has no influence on the persistence of published messages. A durable queue containing transient messages will flush those messages on a restart.
If the queue has already been declared, any redeclaration will ignore this setting. A queue may only be declared durable the first time when it is created.
-
:exclusive => true | false (default false)
Exclusive queues may only be consumed from by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’. Only a single consumer is allowed to remove messages from this queue.
The default is a shared queue. Multiple clients may consume messages from this queue.
Attempting to redeclare an already-declared queue as :exclusive => true will raise MQ:Error.
-
:auto_delete = true | false (default false)
If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted.
The server waits for a short period of time before determining the queue is unused to give time to the client code to bind an exchange to it.
If the queue has been previously declared, this option is ignored on subsequent declarations.
Any remaining messages in the queue will be purged when the queue is deleted regardless of the message’s persistence setting.
-
:nowait => true | false (default true)
If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
694 695 696 |
# File 'lib/mq.rb', line 694 def queue name, opts = {} queues[name] ||= Queue.new(self, name, opts) end |
#queues ⇒ Object
Returns a hash of all the queue proxy objects.
Not typically called by client code.
801 802 803 |
# File 'lib/mq.rb', line 801 def queues @queues ||= {} end |
#recover(requeue = false) ⇒ Object
Asks the broker to redeliver all unacknowledged messages on this channel.
-
requeue (default false)
If this parameter is false, the message will be redelivered to the original recipient. If this flag is true, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
773 774 775 776 |
# File 'lib/mq.rb', line 773 def recover requeue = false send Protocol::Basic::Recover.new(:requeue => requeue) self end |
#reset ⇒ Object
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 |
# File 'lib/mq.rb', line 827 def reset @deferred_status = nil @channel = nil initialize @connection @consumers = {} exs = @exchanges @exchanges = {} exs.each{ |_,e| e.reset } if exs qus = @queues @queues = {} qus.each{ |_,q| q.reset } if qus prefetch(@prefetch_size) if @prefetch_size end |
#rpc(name, obj = nil) ⇒ Object
Takes a channel, queue and optional object.
The optional object may be a class name, module name or object instance. When given a class or module name, the object is instantiated during this setup. The passed queue is automatically subscribed to so it passes all messages (and their arguments) to the object.
Marshalling and unmarshalling the objects is handled internally. This marshalling is subject to the same restrictions as defined in the Marshal standard library. See that documentation for further reference.
When the optional object is not passed, the returned rpc reference is used to send messages and arguments to the queue. See #method_missing which does all of the heavy lifting with the proxy. Some client elsewhere must call this method with the optional block so that there is a valid destination. Failure to do so will just enqueue marshalled messages that are never consumed.
EM.run do
server = MQ.rpc('hash table node', Hash)
client = MQ.rpc('hash table node')
client[:now] = Time.now
client[:one] = 1
client.values do |res|
p 'client', :values => res
end
client.keys do |res|
p 'client', :keys => res
EM.stop_event_loop
end
end
734 735 736 |
# File 'lib/mq.rb', line 734 def rpc name, obj = nil rpcs[name] ||= RPC.new(self, name, obj) end |
#rpcs ⇒ Object
Returns a hash of all rpc proxy objects.
Not typically called by client code.
816 817 818 |
# File 'lib/mq.rb', line 816 def rpcs @rcps ||= {} end |
#send(*args) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/mq.rb', line 261 def send *args conn.callback{ |c| (@_send_mutex ||= Mutex.new).synchronize do args.each do |data| data.ticket = @ticket if @ticket and data.respond_to? :ticket= log :sending, data c.send data, :channel => @channel end end } end |
#topic(name = 'amq.topic', opts = {}) ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
Topic
A topic exchange allows for messages to be published to an exchange tagged with a specific routing key. The Exchange uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.
This is the only exchange type to honor the key
hash key for all cases.
Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.
As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’ (this is not required by the standard). Allocating this exchange without a name or with the empty string will use the internal ‘amq.topic’ exchange.
The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple Computer would look like:
'stock.us.aapl'
For a foreign stock, it may look like:
'stock.de.dax'
When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.
EM.run do
exch = MQ.topic("stocks")
keys = ['stock.us.aapl', 'stock.de.dax']
EM.add_periodic_timer(1) do # every second
puts
exch.publish(10+rand(10), :routing_key => keys[rand(2)])
end
# match against one dot-separated item
MQ.queue('us stocks').bind(exch, :key => 'stock.us.*').subscribe do |price|
puts "us stock price [#{price}]"
end
# match against multiple dot-separated items
MQ.queue('all stocks').bind(exch, :key => 'stock.#').subscribe do |price|
puts "all stocks: price [#{price}]"
end
# require exact match
MQ.queue('only dax').bind(exch, :key => 'stock.de.dax').subscribe do |price|
puts "dax price [#{price}]"
end
end
For matching, the ‘*’ (asterisk) wildcard matches against one dot-separated item only. The ‘#’ wildcard (hash or pound symbol) matches against 0 or more dot-separated items. If none of these symbols are used, the exchange performs a comparison looking for an exact match.
Options
-
:passive => true | false (default false)
If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.
-
:durable => true | false (default false)
If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.
Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.
-
:auto_delete => true | false (default false)
If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.
If the exchange has been previously declared, this option is ignored on subsequent declarations.
-
:internal => true | false (default false)
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
-
:nowait => true | false (default true)
If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
Exceptions
Doing any of these activities are illegal and will raise MQ:Error.
-
redeclare an already-declared exchange to a different type
-
:passive => true and the exchange does not exist (NOT_FOUND)
544 545 546 |
# File 'lib/mq.rb', line 544 def topic name = 'amq.topic', opts = {} exchanges[name] ||= Exchange.new(self, :topic, name, opts) end |