Class: Ruote::Amqp::Participant
- Inherits:
-
Object
- Object
- Ruote::Amqp::Participant
- Includes:
- LocalParticipant
- Defined in:
- lib/ruote/amqp/participant.rb
Overview
This participant publishes messages on AMQP exchanges.
options
A few options are supported. They can be declared at 3 levels:
-
options (when the participant is registered)
dashboard.register(
'amqp_participant', Ruote::Amqp::Participant, :routing_key => 'nada.x')
-
params (from the process definition)
sequence do
amqp_participant :routing_key => 'nada.x'
end
-
fields (from the passing workitem)
sequence do
set 'f:routing_key' => 'nada.x' amqp_participant
end
The ‘conf’ option (only available at participant registration) decides which levels are enabled or not.
By default ‘conf’ is set to ‘params, fields, options’.
‘conf’
As said above, this option decides who can tweak this participant’s options. It accepts a comma-separated list of levels.
The levels are “params”, “fields”, “options”.
The order in which the levels are given is the order in which they are investigated for values.
‘connection’
A hash of connection options. This is direcly fed to the amqp gem, the options of that gem apply thus (‘host’, ‘port’, ‘vhost’, ‘username’ and ‘password’).
If no ‘connection’ (or :connection) hash is passed, the participant will attempt to use the connection (AMQP::Session) found in Ruote::Amqp.session. If there is nothing in there, it will [attempt] to create a new connection with AMQP’s default settings.
‘exchange’
Accepts a two or three sized Array.
The first element is a string or symbol detailing the exchange type, like :direct, :fanout, :topic, …
The second element is an exchange name.
The third, optional, element is a hash of exchange options.
There is more information at rubyamqp.info/
By default, ‘exchange’ is set to [ ‘direct’, ” ] (the default exchange).
Note: you cannot pass an instantiated Ruby-AMQP exchange here. Ruote cannot serialize it for remote workers, so the settings are passed in a flat form, easily JSONifiable.
‘field_prefix’
Sometimes one wants to separate his AMQP participant settings from other workitem fields.
dashboard.register(
'amqp_participant',
Ruote::Amqp::Participant,
:conf => 'fields', :field_prefix => 'amqp_')
registers a participant that draws is configuration from workitem fields prefixed with ‘amqp_’.
Note that setting this option doesn’t implicitely add ‘fields’ to the ‘conf’ option.
‘forget’
When set to true forces the participant to reply to the engine immediately after the message got published, in a “fire and forget” fashion.
‘routing_key’
Depending on the exchange used, this option lets you influence how the exchange routes the message towards queues.
Consult your AMQP documentation for more information.
‘message’
By default, the workitem is turned into a JSON string and transmitted in the AMQP message payload. If this ‘message’ option is set, its value is used as the payload.
‘persistent’
If this option is set to something else than false or nil, messages messages published by this participant will be persistent (hopefully the queues they’ll end up in will be persistent as well).
#encode_workitem
The default way to encode a workitem before pushing it to the exchange is by turning it entirely into a JSON string.
To alter this, one can subclass this participant and provide its own #encode_workitem(wi) method:
require 'yaml'
class MyAmqpParticipant < Ruote::Amqp::Participant
def encode_workitem(workitem)
YAML.dump(workitem)
end
end
or when one needs to filter some fields:
class MyAmqpParticipant < Ruote::Amqp::Participant
def encode_workitem(workitem)
workitem.fields.delete_if { |k, v| k.match(/^private_/) }
super(workitem)
end
end
Direct Known Subclasses
Instance Method Summary collapse
-
#correlation_id ⇒ Object
Returns the correlation_id for the message publication.
-
#do_not_thread ⇒ Object
No need for a dedicated thread when dispatching messages.
-
#exchange ⇒ Object
Returns the exchange coordinates (a triple [ type, name, options ]).
-
#forget ⇒ Object
Returns something true-ish if the participant should not reply to the engine once the publish operation is done.
-
#initialize(options) ⇒ Participant
constructor
Initializing the participant, right before calling #on_workitem or another on_ method.
-
#message ⇒ Object
Returns the message to publish.
- #on_cancel ⇒ Object
-
#on_workitem ⇒ Object
Workitem consumption code.
-
#persistent ⇒ Object
Returns whether the publish should be persistent or not.
-
#routing_key ⇒ Object
Returns the routing key for the message to publish.
Constructor Details
#initialize(options) ⇒ Participant
Initializing the participant, right before calling #on_workitem or another on_ method.
172 173 174 175 176 177 178 179 180 |
# File 'lib/ruote/amqp/participant.rb', line 172 def initialize() @options = @conf = (@options['conf'] || 'params, fields, options').split(/\s*,\s*/) @conf = %w[ params fields options ] if @conf.include?('all') @field_prefix = @options['field_prefix'] || '' end |
Instance Method Details
#correlation_id ⇒ Object
Returns the correlation_id for the message publication. Returns ” by default.
Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).
251 252 253 254 |
# File 'lib/ruote/amqp/participant.rb', line 251 def correlation_id opt('correlation_id') || '' end |
#do_not_thread ⇒ Object
No need for a dedicated thread when dispatching messages. Respond true.
209 |
# File 'lib/ruote/amqp/participant.rb', line 209 def do_not_thread; true; end |
#exchange ⇒ Object
Returns the exchange coordinates (a triple [ type, name, options ]). Defaults to the direct exchange.
Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).
217 218 219 220 221 222 |
# File 'lib/ruote/amqp/participant.rb', line 217 def exchange opt('exchange') || [ 'direct', '', {} ] # # defaults to the "default exchange"... end |
#forget ⇒ Object
Returns something true-ish if the participant should not reply to the engine once the publish operation is done.
Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).
262 |
# File 'lib/ruote/amqp/participant.rb', line 262 def forget; opt('forget'); end |
#message ⇒ Object
Returns the message to publish.
Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).
229 |
# File 'lib/ruote/amqp/participant.rb', line 229 def ; opt('message') || encode_workitem; end |
#on_cancel ⇒ Object
195 196 197 198 199 200 201 202 203 204 |
# File 'lib/ruote/amqp/participant.rb', line 195 def on_cancel return if opt('discard_cancel') instantiate_exchange.publish( encode_cancelitem, :routing_key => routing_key, :persistent => persistent, :correlation_id => correlation_id) end |
#on_workitem ⇒ Object
Workitem consumption code.
184 185 186 187 188 189 190 191 192 193 |
# File 'lib/ruote/amqp/participant.rb', line 184 def on_workitem instantiate_exchange.publish( , :routing_key => routing_key, :persistent => persistent, :correlation_id => correlation_id) reply if forget end |
#persistent ⇒ Object
Returns whether the publish should be persistent or not.
Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).
243 |
# File 'lib/ruote/amqp/participant.rb', line 243 def persistent; opt('persistent'); end |
#routing_key ⇒ Object
Returns the routing key for the message to publish.
Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).
236 |
# File 'lib/ruote/amqp/participant.rb', line 236 def routing_key; opt('routing_key'); end |