Class: ReliableMsg::Topic
Overview
Pub/Sub Topic API
Use the Topic object to publish a message on a topic, get messages from topics.
You can create a Topic object that connects to a single topic by passing the topic name to the initialized. You can also access other topics by specifying the destination topic when putting a message.
For example:
topic = Topic.new 'my-topic'
# Publish a message on the topic, expiring in 30 seconds.
msg = 'lorem ipsum'
mid = topic.put msg, :expires=>30
# Retrieve and process a message on the topic.
topic.get do |msg|
if msg.id == mid
print "Retrieved same message"
end
print "Message text: #{msg.object}"
end
See Topic.get and Topic.put for more examples.
Constant Summary collapse
- INIT_OPTIONS =
[:expires, :drb_uri, :tx_timeout, :connect_count]
Constants inherited from Client
Client::DEFAULT_CONNECT_RETRY, Client::DEFAULT_DRB_URI, Client::DEFAULT_TX_TIMEOUT, Client::DLQ, Client::DRB_PORT, Client::ERROR_INVALID_CONNECT_COUNT, Client::ERROR_INVALID_INIT_OPTION, Client::ERROR_INVALID_SELECTOR, Client::ERROR_INVALID_TX_TIMEOUT, Client::ERROR_SELECTOR_VALUE_OR_BLOCK, Client::THREAD_CURRENT_TX
Instance Method Summary collapse
-
#get(selector = nil, &block) ⇒ Object
Get a message on the topic.
-
#initialize(topic = nil, options = nil) ⇒ Topic
constructor
The optional argument
topic
specifies the topic name. -
#name ⇒ Object
Returns the topic name.
-
#put(message, headers = nil) ⇒ Object
Publish a message on the topic.
Methods inherited from Client
#connect_count, #connect_count=, selector, #selector, #tx_timeout, #tx_timeout=
Constructor Details
#initialize(topic = nil, options = nil) ⇒ Topic
The optional argument topic
specifies the topic name. The application can still publish messages on other topics by specifying the destination topics name in the header.
The following options can be passed to the initializer:
-
:expires
– Message expiration in seconds. Default for new messages. -
:drb_uri
– DRb URI for connecting to the queue manager. Only required when using a remote queue manager, or different port. -
:tx_timeout
– Transaction timeout. See tx_timeout. -
:connect_count
– Connection attempts. See connect_count.
:call-seq:
Topic.new([name [,options]]) -> topic
59 60 61 62 63 64 65 66 |
# File 'lib/reliable-msg/topic.rb', line 59 def initialize topic = nil, = nil .each do |name, value| raise RuntimeError, format(ERROR_INVALID_OPTION, name) unless INIT_OPTIONS.include?(name) instance_variable_set "@#{name.to_s}".to_sym, value end if @topic = topic @seen = nil end |
Instance Method Details
#get(selector = nil, &block) ⇒ Object
Get a message on the topic.
Call with no arguments to retrieve the last message published on the topic. Call with selectors to retrieve only matching messages. See also Queue.get.
The following headers have special meaning:
-
:id
– The message identifier. -
:created
– Indicates timestamp (in seconds) when the message was created. -
:expires_at
– Indicates timestamp (in seconds) when the message will expire,nil
if the message does not expire.
Call this method without a block to return the message. The returned object is of type Message, or nil
if no message is found.
Call this method in a block to retrieve and process the message. The block is called with the Message object, returning the result of the block. Returns nil
if no message is found.
All operations performed on the topic inside the block are part of the same transaction. See Queue.get for discussion about transactions. Note that retry counts and delivery modes do not apply to Topics. A message remains published on the topic until replaced by another message.
:call-seq:
topic.get([selector]) -> msg or nil
topic.get([selector]) {|msg| ... } -> obj
See: Message
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/reliable-msg/topic.rb', line 150 def get selector = nil, &block tx = old_tx = Thread.current[THREAD_CURRENT_TX] # If block, begin a new transaction. if block tx = {:qm=>qm} tx[:tid] = tx[:qm].begin tx_timeout Thread.current[THREAD_CURRENT_TX] = tx end result = begin # Validate the selector: nil or hash. selector = case selector when Hash, Selector, nil selector else raise ArgumentError, ERROR_INVALID_SELECTOR end # If inside a transaction, always retrieve from the same queue manager, # otherwise, allow repeated() to try and access multiple queue managers. = if tx tx[:qm].retrieve :seen=>@seen, :topic=>@topic, :selector=>(selector.is_a?(Selector) ? nil : selector), :tid=>tx[:tid] else repeated { |qm| qm.retrieve :seen=>@seen, :topic=>@topic, :selector=>(selector.is_a?(Selector) ? nil : selector) } end # Result is either message, or result from processing block. Note that # calling block may raise an exception. We deserialize the message here # for two reasons: # 1. It creates a distinct copy, so changing the message object and returning # it to the queue (abort) does not affect other consumers. # 2. The message may rely on classes known to the client but not available # to the queue manager. result = if # Do not process message unless selector matches. Do not mark # message as seen either, since we may retrieve it if the selector # changes. if selector.is_a?(Selector) return nil unless selector.match [:headers] end @seen = [:id] = Message.new([:id], [:headers], Marshal::load([:message])) block ? block.call() : end rescue Exception=>error # Abort the transaction if we started it. Propagate error. qm.abort(tx[:tid]) if block raise error ensure # Resume the old transaction. Thread.current[THREAD_CURRENT_TX] = old_tx if block end # Commit the transaction and return the result. We do this outside the main # block, since we don't abort in case of error (commit is one-phase) and we # don't retain the transaction association, it completes by definition. qm.commit(tx[:tid]) if block result end |
#name ⇒ Object
Returns the topic name.
208 209 210 |
# File 'lib/reliable-msg/topic.rb', line 208 def name @topic end |
#put(message, headers = nil) ⇒ Object
Publish a message on the topic.
The message
argument is required, but may be nil
Headers are optional. Headers are used to provide the application with additional information about the message, and can be used to retrieve messages (see Topic.get for discussion of selectors). Some headers are used to handle message processing internally (e.g. :expires
).
Each header uses a symbol for its name. The value may be string, numeric, true/false or nil. No other objects are allowed. To improve performance, keep headers as small as possible.
The following headers have special meaning:
-
:topic
– Publish the onn the named topic. Otherwise, uses the topic specified when creating this Topic object. -
:expires
– Message expiration in seconds. Messages do not expire unless specified. Zero ornil
means no expiration. -
:expires_at
– Specifies when the message expires (timestamp). Alternative to:expires
.
Headers can be set on a per-topic basis when the Topic is created. This only affects messages put through that Topic object.
For example:
topic.put updates
topic.put notice, :expires=>10
topic.put object, :topic=>'other-topic'
:call-seq:
topic.put(message[, headers])
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/reliable-msg/topic.rb', line 101 def put , headers = nil tx = Thread.current[THREAD_CURRENT_TX] # Use headers supplied by callers, or defaults for this topic. defaults = { :expires=> @expires } headers = headers ? defaults.merge(headers) : defaults # Serialize the message before sending to queue manager. We need the # message to be serialized for storage, this just saves duplicate # serialization when using DRb. = Marshal::dump # If inside a transaction, always send to the same queue manager, otherwise, # allow repeated() to try and access multiple queue managers. if tx tx[:qm].publish(:message=>, :headers=>headers, :topic=>(headers[:topic] || @topic), :tid=>tx[:tid]) else repeated { |qm| qm.publish :message=>, :headers=>headers, :topic=>(headers[:topic] || @topic) } end end |