Class: Ruote::Amqp::Participant

Inherits:
Object
  • Object
show all
Includes:
LocalParticipant
Defined in:
lib/ruote/amqp/participant.rb

Overview

This participant publishes messages on AMQP exchanges.

options

A few options are supported. They can be declared at 3 levels:

  • options (when the participant is registered)

    dashboard.register(

    'amqp_participant',
    Ruote::Amqp::Participant,
    :routing_key => 'nada.x')
    
  • params (from the process definition)

    sequence do

    amqp_participant :routing_key => 'nada.x'
    

    end

  • fields (from the passing workitem)

    sequence do

    set 'f:routing_key' => 'nada.x'
    amqp_participant
    

    end

The ‘conf’ option (only available at participant registration) decides which levels are enabled or not.

By default ‘conf’ is set to ‘params, fields, options’.

‘conf’

As said above, this option decides who can tweak this participant’s options. It accepts a comma-separated list of levels.

The levels are “params”, “fields”, “options”.

The order in which the levels are given is the order in which they are investigated for values.

‘connection’

A hash of connection options. This is direcly fed to the amqp gem, the options of that gem apply thus (‘host’, ‘port’, ‘vhost’, ‘username’ and ‘password’).

If no ‘connection’ (or :connection) hash is passed, the participant will attempt to use the connection (AMQP::Session) found in Ruote::Amqp.session. If there is nothing in there, it will [attempt] to create a new connection with AMQP’s default settings.

‘exchange’

Accepts a two or three sized Array.

The first element is a string or symbol detailing the exchange type, like :direct, :fanout, :topic, …

The second element is an exchange name.

The third, optional, element is a hash of exchange options.

There is more information at rubyamqp.info/

By default, ‘exchange’ is set to [ ‘direct’, ” ] (the default exchange).

Note: you cannot pass an instantiated Ruby-AMQP exchange here. Ruote cannot serialize it for remote workers, so the settings are passed in a flat form, easily JSONifiable.

‘field_prefix’

Sometimes one wants to separate his AMQP participant settings from other workitem fields.

dashboard.register(
  'amqp_participant',
  Ruote::Amqp::Participant,
  :conf => 'fields', :field_prefix => 'amqp_')

registers a participant that draws is configuration from workitem fields prefixed with ‘amqp_’.

Note that setting this option doesn’t implicitely add ‘fields’ to the ‘conf’ option.

‘forget’

When set to true forces the participant to reply to the engine immediately after the message got published, in a “fire and forget” fashion.

‘routing_key’

Depending on the exchange used, this option lets you influence how the exchange routes the message towards queues.

Consult your AMQP documentation for more information.

‘message’

By default, the workitem is turned into a JSON string and transmitted in the AMQP message payload. If this ‘message’ option is set, its value is used as the payload.

‘persistent’

If this option is set to something else than false or nil, messages messages published by this participant will be persistent (hopefully the queues they’ll end up in will be persistent as well).

#encode_workitem

The default way to encode a workitem before pushing it to the exchange is by turning it entirely into a JSON string.

To alter this, one can subclass this participant and provide its own #encode_workitem(wi) method:

require 'yaml'

class MyAmqpParticipant < Ruote::Amqp::Participant

  def encode_workitem(workitem)
    YAML.dump(workitem)
  end
end

or when one needs to filter some fields:

class MyAmqpParticipant < Ruote::Amqp::Participant

  def encode_workitem(workitem)
    workitem.fields.delete_if { |k, v| k.match(/^private_/) }
    super(workitem)
  end
end

Direct Known Subclasses

AlertParticipant

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Participant

Initializing the participant, right before calling #on_workitem or another on_ method.



172
173
174
175
176
177
178
179
180
# File 'lib/ruote/amqp/participant.rb', line 172

def initialize(options)

  @options = options

  @conf = (@options['conf'] || 'params, fields, options').split(/\s*,\s*/)
  @conf = %w[ params fields options ] if @conf.include?('all')

  @field_prefix = @options['field_prefix'] || ''
end

Instance Method Details

#correlation_idObject

Returns the correlation_id for the message publication. Returns ” by default.

Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).



251
252
253
254
# File 'lib/ruote/amqp/participant.rb', line 251

def correlation_id

  opt('correlation_id') || ''
end

#do_not_threadObject

No need for a dedicated thread when dispatching messages. Respond true.



209
# File 'lib/ruote/amqp/participant.rb', line 209

def do_not_thread; true; end

#exchangeObject

Returns the exchange coordinates (a triple [ type, name, options ]). Defaults to the direct exchange.

Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).



217
218
219
220
221
222
# File 'lib/ruote/amqp/participant.rb', line 217

def exchange

  opt('exchange') || [ 'direct', '', {} ]
    #
    # defaults to the "default exchange"...
end

#forgetObject

Returns something true-ish if the participant should not reply to the engine once the publish operation is done.

Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).



262
# File 'lib/ruote/amqp/participant.rb', line 262

def forget; opt('forget'); end

#messageObject

Returns the message to publish.

Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).



229
# File 'lib/ruote/amqp/participant.rb', line 229

def message; opt('message') || encode_workitem; end

#on_cancelObject



195
196
197
198
199
200
201
202
203
204
# File 'lib/ruote/amqp/participant.rb', line 195

def on_cancel

  return if opt('discard_cancel')

  instantiate_exchange.publish(
    encode_cancelitem,
    :routing_key => routing_key,
    :persistent => persistent,
    :correlation_id => correlation_id)
end

#on_workitemObject

Workitem consumption code.



184
185
186
187
188
189
190
191
192
193
# File 'lib/ruote/amqp/participant.rb', line 184

def on_workitem

  instantiate_exchange.publish(
    message,
    :routing_key => routing_key,
    :persistent => persistent,
    :correlation_id => correlation_id)

  reply if forget
end

#persistentObject

Returns whether the publish should be persistent or not.

Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).



243
# File 'lib/ruote/amqp/participant.rb', line 243

def persistent; opt('persistent'); end

#routing_keyObject

Returns the routing key for the message to publish.

Available as a method so it can be overriden (the return value could depend on the @workitem or other factors).



236
# File 'lib/ruote/amqp/participant.rb', line 236

def routing_key; opt('routing_key'); end