Class: Ruote::Amqp::Receiver
- Inherits:
-
Receiver
- Object
- Receiver
- Ruote::Amqp::Receiver
- Defined in:
- lib/ruote/amqp/receiver.rb
Overview
A receiver is plugged between a ruote engine/storage and an AMQP queue. It will listen on the queue for messages, try to turn them into workitems and feed those workitem back to the engine/storage (in the usual use case, those workitems were initially emitted by the engine).
#decode_message(headers, payload)
By default, the receiver expects the incoming workitem to be serialized entirely in the payload of the AMQP message. One can change this behaviour by overriding the #decode_workitem method (usually when subclassing)
class MyYamlReceiver < Ruote::Amqp::Receiver
def (headers, payload)
YAML.load(payload)
end
end
#decode_message is supposed to return a Ruby hash (describing either a workitem or a launchitem), the difference is explained below.
workitems and launchitems
The standard use case is to accept workitems coming back (they probably left the engine via Ruote::Amqp::Participant). But it’s also OK to accept “launchitems”, hashes with at least one ‘process_definition’ (or ‘definition’) entry.
Upon receiving a launchitem, the receiver will launch a new process instances.
Launchitems may have two more optional entries, ‘workitems_fields’ (or ‘fields’) and ‘process_variables’ (or ‘variables’).
‘workitem_fields’ must contain a hash of initial workitem fields (they will populate the initial workitem.
‘process_variables’ are a very advanced option. It’s possible to set the initial variables in a workflow. Read the general ruote documentation to learn about the difference between fields and variables.
The #decode_message is supposed to return a hash representing either a workitem, either a launchitem.
#handle_error(err)
Out of the box, the receiver will print out to $stderr the details of errors it encounters when receiving, decoding and handing back the workitems (messages?) to the engine. This can be changed by overriding the #handle_error method:
class MyReceiver < Ruote::Amqp::Receiver
def handle_error(err)
ThatLoggerService.log(err)
end
end
initialization options
One can set the “launch_only” option to true when initializing the receiver to prevent it from handling anything but launchitems.
receiver = Ruote::Amqp::Receiver.new(
@dashboard, @amqp_queue, :launch_only => true)
‘launch_only’ (string) is valid too.
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#initialize(engine_or_storage, queue, options = {}) ⇒ Receiver
constructor
A new instance of Receiver.
- #shutdown ⇒ Object
Constructor Details
#initialize(engine_or_storage, queue, options = {}) ⇒ Receiver
Returns a new instance of Receiver.
99 100 101 102 103 104 105 |
# File 'lib/ruote/amqp/receiver.rb', line 99 def initialize(engine_or_storage, queue, ={}) super(engine_or_storage, Ruote.keys_to_s()) @queue = queue @queue.subscribe(&method(:handle)) end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
97 98 99 |
# File 'lib/ruote/amqp/receiver.rb', line 97 def queue @queue end |
Instance Method Details
#shutdown ⇒ Object
107 108 109 110 |
# File 'lib/ruote/amqp/receiver.rb', line 107 def shutdown @queue.unsubscribe end |