Class: ManageIQ::Messaging::Client
- Inherits:
-
Object
- Object
- ManageIQ::Messaging::Client
- Defined in:
- lib/manageiq/messaging/client.rb
Overview
The abstract client class. It defines methods needed to publish or subscribe messages. It is not recommended to directly create a solid subclass instance. The proper way is to call class method Client.open
with desired protocol. For example:
client = ManageIQ::Messaging::Client.open(
:protocol => 'Stomp',
:host => 'localhost',
:port => 61616,
:password => 'smartvm',
:username => 'admin',
:client_ref => 'generic_1',
:encoding => 'json'
)
To close the connection one needs to explicitly call client.close
. Alternatively if a block is given for the open
method, the connection will be closed automatically before existing the block. For example:
ManageIQ::Messaging::Client.open(
:protocol => 'Stomp'
:host => 'localhost',
:port => 61616,
:password => 'smartvm',
:username => 'admin',
:client_ref => 'generic_1'
) do |client|
# do stuff with the client
end
end
Direct Known Subclasses
Class Method Summary collapse
-
.open(options) ⇒ Object
Open or create a connection to the message broker.
Instance Method Summary collapse
-
#publish_message(options, &block) ⇒ Object
Publish a message to a queue.
-
#publish_messages(messages) ⇒ Object
Publish multiple messages to a queue.
-
#publish_topic(messages) ⇒ Object
Publish a message as a topic.
-
#subscribe_background_job(options) ⇒ Object
Subscribe to receive from a queue and run each message as a background job.
-
#subscribe_messages(options, &block) ⇒ Object
Subscribe to receive messages from a queue.
-
#subscribe_topic(options, &block) ⇒ Object
Subscribe to receive topic type messages.
Class Method Details
.open(options) ⇒ Object
Open or create a connection to the message broker. Expected options
keys are:
-
:protocol (Implemented: ‘Stomp’, ‘Kafka’. Default ‘Stomp’)
-
:host (hostname or IP address of the messaging broker)
-
:port (host port number)
-
:username (optional)
-
:password (optional)
-
:ssl (true/false)
-
:ca_file (optional. path to a certificate authority cert)
-
:encoding (‘yaml’ or ‘json’. Default ‘yaml’)
Other connection options are underlying messaging system specific.
Returns a Client
instance if no block is given.
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/manageiq/messaging/client.rb', line 46 def self.open() protocol = [:protocol] || :Stomp client = Object.const_get("ManageIQ::Messaging::#{protocol}::Client").new() return client unless block_given? begin yield client ensure client.close end nil end |
Instance Method Details
#publish_message(options, &block) ⇒ Object
Publish a message to a queue. The message will be delivered to only one subscriber. Expected keys in options
are:
-
:service (service and affinity are used to determine the queue name)
-
:affinity (optional)
-
:class_name (optional)
-
:message (e.g. method name or message type)
-
:payload (message body, a string or an user object that can be serialized)
-
:sender (optional, identify the sender)
-
:headers (optional, additional headers to add to the message)
Other options are underlying messaging system specific.
Optionally a call back block can be provided to wait on the consumer to send an acknowledgment. Not every underlying messaging system supports callback. Example:
client.(
:service => 'ems_operation',
:affinity => 'ems_amazon1',
:message => 'power_on',
:payload => {
:ems_ref => 'u987',
:id => '123'
}
) do |result|
ansible_install_pkg(vm1) if result == 'running'
end
86 87 88 89 90 |
# File 'lib/manageiq/messaging/client.rb', line 86 def (, &block) (, [:message, :service]) (, &block) end |
#publish_messages(messages) ⇒ Object
Publish multiple messages to a queue. An aggregate version of #publish_message
but for better performance. All messages are sent in a batch. Every element in messages
array is an options
hash.
97 98 99 |
# File 'lib/manageiq/messaging/client.rb', line 97 def () () end |
#publish_topic(messages) ⇒ Object
Publish a message as a topic. All subscribers will receive a copy of the message. messages
can be either a hash or an array of hashes. Expected keys are:
-
:service (service is used to determine the topic address)
-
:event (event name)
-
:payload (message body, a string or an user object that can be serialized)
-
:sender (optional, identify the sender)
-
:headers (optional, additional headers to add to the message)
Other options are underlying messaging system specific.
179 180 181 182 183 184 |
# File 'lib/manageiq/messaging/client.rb', line 179 def publish_topic() = Array.wrap() .each { |msg| (msg, [:event, :service]) } publish_topic_impl() end |
#subscribe_background_job(options) ⇒ Object
Subscribe to receive from a queue and run each message as a background job. Expected keys in options
are:
-
:service (service and affinity are used to determine the queue)
-
:affinity (optional)
-
:auto_ack (default true, if it is false, client.ack method must be explicitly called)
Other options are underlying messaging system specific.
This subscriber consumes messages sent through publish_message
with required options
keys, for example:
client.(
:service => 'generic',
:class_name => 'MiqTask',
:message => 'update_attributes', # method name, for instance method :instance_id is required
:payload => {
:instance_id => 2, # database id of class instance stored in rails DB
:args => [{:status => 'Timeout'}] # argument list expected by the method
}
)
Background job assumes each job is not re-triable. It is auto-acked as soon as a request is received
163 164 165 166 167 |
# File 'lib/manageiq/messaging/client.rb', line 163 def subscribe_background_job() (, [:service]) subscribe_background_job_impl() end |
#subscribe_messages(options, &block) ⇒ Object
Subscribe to receive messages from a queue. Expected keys in options
are:
-
:service (service and affinity are used to determine the queue)
-
:affinity (optional)
-
:auto_ack (default true, if it is false, client.ack method must be explicitly called)
Other options are underlying messaging system specific.
A callback block is needed to consume the messages:
client.() do ||
.each do |msg|
# msg is a type of ManageIQ::Messaging::ReceivedMessage
# attributes in msg
msg.sender
msg.
msg.payload
msg.ack_ref
msg.ack # needed only when options[:auto_ack] is false
# process the message
end
end
With the auto_ack option default to true, the message will be automatically acked immediately after the delivery. Some messaging systems allow the subscriber to ack each message in the callback block. The code in the block can decide when to ack according to whether a message can be retried. Ack the message in the beginning of processing if the message is not re-triable; otherwise ack it after the message is proccessed. Any un-acked message will be redelivered to next subscriber AFTER the current subscriber disconnects normally or abnormally (e.g. crashed).
To ack a message call msg.ack
134 135 136 137 138 139 |
# File 'lib/manageiq/messaging/client.rb', line 134 def (, &block) raise "A block is required" unless block_given? (, [:service]) (, &block) end |
#subscribe_topic(options, &block) ⇒ Object
Subscribe to receive topic type messages. Expected keys in options
are:
-
:service (service is used to determine the topic address)
Other options are underlying messaging system specific.
Some messaging systems allow subscribers to consume events missed during the period when the client is offline when they reconnect. Additional options are needed to turn on this feature.
A callback block is needed to consume the topic:
client.subcribe_topic(:service => 'provider_events', :auto_ack => false) do |msg|
# msg is a type of ManageIQ::Messaging::ReceivedMessage
# attributes in msg
msg.sender
msg.
msg.payload
msg.ack_ref
msg.ack # needed only when options[:auto_ack] is false
# process the message
end
With the auto_ack option default to true, the message will be automatically acked immediately after the delivery. Some messaging systems allow the subscriber to ack each message in the callback block. The code in the block can decide when to ack according to whether a message can be retried. Ack the message in the beginning of processing if the message is not re-triable; otherwise ack it after the message is proccessed. Any un-acked message will be redelivered to next subscriber AFTER the current subscriber disconnects normally or abnormally (e.g. crashed).
To ack a message call msg.ack
219 220 221 222 223 224 |
# File 'lib/manageiq/messaging/client.rb', line 219 def subscribe_topic(, &block) raise "A block is required" unless block_given? (, [:service]) subscribe_topic_impl(, &block) end |