Class: ManageIQ::Messaging::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/manageiq/messaging/client.rb

Overview

The abstract client class. It defines methods needed to publish or subscribe messages. It is not recommended to directly create a solid subclass instance. The proper way is to call class method Client.open with desired protocol. For example:

client = ManageIQ::Messaging::Client.open(
  :protocol  => 'Stomp',
  :host       => 'localhost',
  :port       => 61616,
  :password   => 'smartvm',
  :username   => 'admin',
  :client_ref => 'generic_1',
  :encoding   => 'json'
)

To close the connection one needs to explicitly call client.close. Alternatively if a block is given for the open method, the connection will be closed automatically before existing the block. For example:

ManageIQ::Messaging::Client.open(
  :protocol   => 'Stomp'
  :host       => 'localhost',
  :port       => 61616,
  :password   => 'smartvm',
  :username   => 'admin',
  :client_ref => 'generic_1'
) do |client|
  # do stuff with the client
  end
end

Direct Known Subclasses

Kafka::Client, Stomp::Client

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.open(options) ⇒ Object

Open or create a connection to the message broker. Expected options keys are:

  • :protocol (Implemented: ‘Stomp’, ‘Kafka’. Default ‘Stomp’)

  • :host (hostname or IP address of the messaging broker)

  • :port (host port number)

  • :username (optional)

  • :password (optional)

  • :ssl (true/false)

  • :ca_file (optional. path to a certificate authority cert)

  • :encoding (‘yaml’ or ‘json’. Default ‘yaml’)

Other connection options are underlying messaging system specific.

Returns a Client instance if no block is given.



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/manageiq/messaging/client.rb', line 46

def self.open(options)
  protocol = options[:protocol] || :Stomp
  client = Object.const_get("ManageIQ::Messaging::#{protocol}::Client").new(options)

  return client unless block_given?

  begin
    yield client
  ensure
    client.close
  end
  nil
end

Instance Method Details

#publish_message(options, &block) ⇒ Object

Publish a message to a queue. The message will be delivered to only one subscriber. Expected keys in options are:

  • :service (service and affinity are used to determine the queue name)

  • :affinity (optional)

  • :class_name (optional)

  • :message (e.g. method name or message type)

  • :payload (message body, a string or an user object that can be serialized)

  • :sender (optional, identify the sender)

  • :headers (optional, additional headers to add to the message)

Other options are underlying messaging system specific.

Optionally a call back block can be provided to wait on the consumer to send an acknowledgment. Not every underlying messaging system supports callback. Example:

client.publish_message(
  :service  => 'ems_operation',
  :affinity => 'ems_amazon1',
  :message  => 'power_on',
  :payload  => {
    :ems_ref => 'u987',
    :id      => '123'
  }
) do |result|
  ansible_install_pkg(vm1) if result == 'running'
end


86
87
88
89
90
# File 'lib/manageiq/messaging/client.rb', line 86

def publish_message(options, &block)
  assert_options(options, [:message, :service])

  publish_message_impl(options, &block)
end

#publish_messages(messages) ⇒ Object

Publish multiple messages to a queue. An aggregate version of #publish_message but for better performance. All messages are sent in a batch. Every element in messages array is an options hash.



97
98
99
# File 'lib/manageiq/messaging/client.rb', line 97

def publish_messages(messages)
  publish_messages_impl(messages)
end

#publish_topic(messages) ⇒ Object

Publish a message as a topic. All subscribers will receive a copy of the message. messages can be either a hash or an array of hashes. Expected keys are:

  • :service (service is used to determine the topic address)

  • :event (event name)

  • :payload (message body, a string or an user object that can be serialized)

  • :sender (optional, identify the sender)

  • :headers (optional, additional headers to add to the message)

Other options are underlying messaging system specific.



179
180
181
182
183
184
# File 'lib/manageiq/messaging/client.rb', line 179

def publish_topic(messages)
  messages = Array.wrap(messages)
  messages.each { |msg| assert_options(msg, [:event, :service]) }

  publish_topic_impl(messages)
end

#subscribe_background_job(options) ⇒ Object

Subscribe to receive from a queue and run each message as a background job. Expected keys in options are:

  • :service (service and affinity are used to determine the queue)

  • :affinity (optional)

  • :auto_ack (default true, if it is false, client.ack method must be explicitly called)

Other options are underlying messaging system specific.

This subscriber consumes messages sent through publish_message with required options keys, for example:

client.publish_message(
  :service    => 'generic',
  :class_name => 'MiqTask',
  :message    => 'update_attributes', # method name, for instance method :instance_id is required
  :payload    => {
    :instance_id => 2, # database id of class instance stored in rails DB
    :args        => [{:status => 'Timeout'}] # argument list expected by the method
  }
)

Background job assumes each job is not re-triable. It is auto-acked as soon as a request is received



163
164
165
166
167
# File 'lib/manageiq/messaging/client.rb', line 163

def subscribe_background_job(options)
  assert_options(options, [:service])

  subscribe_background_job_impl(options)
end

#subscribe_messages(options, &block) ⇒ Object

Subscribe to receive messages from a queue. Expected keys in options are:

  • :service (service and affinity are used to determine the queue)

  • :affinity (optional)

  • :auto_ack (default true, if it is false, client.ack method must be explicitly called)

Other options are underlying messaging system specific.

A callback block is needed to consume the messages:

client.subscribe_message(options) do |messages|
  messages.each do |msg|
    # msg is a type of ManageIQ::Messaging::ReceivedMessage
    # attributes in msg
    msg.sender
    msg.message
    msg.payload
    msg.ack_ref

    msg.ack # needed only when options[:auto_ack] is false
    # process the message
  end
end

With the auto_ack option default to true, the message will be automatically acked immediately after the delivery. Some messaging systems allow the subscriber to ack each message in the callback block. The code in the block can decide when to ack according to whether a message can be retried. Ack the message in the beginning of processing if the message is not re-triable; otherwise ack it after the message is proccessed. Any un-acked message will be redelivered to next subscriber AFTER the current subscriber disconnects normally or abnormally (e.g. crashed).

To ack a message call msg.ack



134
135
136
137
138
139
# File 'lib/manageiq/messaging/client.rb', line 134

def subscribe_messages(options, &block)
  raise "A block is required" unless block_given?
  assert_options(options, [:service])

  subscribe_messages_impl(options, &block)
end

#subscribe_topic(options, &block) ⇒ Object

Subscribe to receive topic type messages. Expected keys in options are:

  • :service (service is used to determine the topic address)

Other options are underlying messaging system specific.

Some messaging systems allow subscribers to consume events missed during the period when the client is offline when they reconnect. Additional options are needed to turn on this feature.

A callback block is needed to consume the topic:

client.subcribe_topic(:service => 'provider_events', :auto_ack => false) do |msg|
  # msg is a type of ManageIQ::Messaging::ReceivedMessage
  # attributes in msg
  msg.sender
  msg.message
  msg.payload
  msg.ack_ref

  msg.ack # needed only when options[:auto_ack] is false
  # process the message
end

With the auto_ack option default to true, the message will be automatically acked immediately after the delivery. Some messaging systems allow the subscriber to ack each message in the callback block. The code in the block can decide when to ack according to whether a message can be retried. Ack the message in the beginning of processing if the message is not re-triable; otherwise ack it after the message is proccessed. Any un-acked message will be redelivered to next subscriber AFTER the current subscriber disconnects normally or abnormally (e.g. crashed).

To ack a message call msg.ack



219
220
221
222
223
224
# File 'lib/manageiq/messaging/client.rb', line 219

def subscribe_topic(options, &block)
  raise "A block is required" unless block_given?
  assert_options(options, [:service])

  subscribe_topic_impl(options, &block)
end