Method: AMQP::Channel#topic
- Defined in:
- lib/amqp/channel.rb
#topic(name = 'amq.topic', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a topic Exchange instance.
Learn more about topic exchanges in Exchange class documentation.
Examples:
Using topic exchange to deliver relevant news updates
Using topic exchange to deliver relevant news updates
AMQP.connect do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("pub/sub")
# Subscribers.
channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload|
puts "A new dev post: '#{payload}'"
end
channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload|
puts "A new post about Ruby: '#{payload}'"
end
# Let's publish some data.
exchange.publish "Ruby post", :routing_key => "technology.dev.ruby"
exchange.publish "Erlang post", :routing_key => "technology.dev.erlang"
exchange.publish "Sinatra post", :routing_key => "technology.web.ruby"
exchange.publish "Jewelery post", :routing_key => "jewelery.ruby"
end
Using topic exchange to deliver geographically-relevant data
Using topic exchange to deliver geographically-relevant data
AMQP.connect do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("pub/sub")
# Subscribers.
channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
end
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :routing_key => "europe.italy.roma").
publish("Paris update", :routing_key => "europe.france.paris")
end
Parameters:
-
name
(String)
(defaults to: 'amq.topic')
—
(amq.topic) Exchange name.
-
opts
(Hash)
(defaults to: {})
—
a customizable set of options
Options Hash (opts):
-
:passive
(Boolean)
— default:
false
—
If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.
-
:durable
(Boolean)
— default:
false
—
If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).
-
:auto_delete
(Boolean)
— default:
false
—
If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.
-
:internal
(Boolean)
— default:
default false
—
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.
-
:nowait
(Boolean)
— default:
true
—
If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
Returns:
- (Exchange)
See Also:
652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/amqp/channel.rb', line 652 def topic(name = 'amq.topic', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:topic, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :topic, name, opts, &block)) end end |