Class: Ruote::Amqp::AlertParticipant
- Inherits:
-
Participant
- Object
- Participant
- Ruote::Amqp::AlertParticipant
- Defined in:
- lib/ruote/amqp/alert_participant.rb
Overview
The alert participant, when invoked from a process instance will lay in wait for the next message on a given queue. As soon as the message comes in, it will pack it in the workitem fields and let the process definition resume.
@dashboard.register(
:wait_for_info,
Ruote::Amqp::AlertParticipant,
:queue => 'info')
pdef = Ruote.define do
wait_for_job
# ... the rest of the flow
end
configuration
This class is mostly a subclass of Ruote::Amqp::Participant, it accepts the same configuration options (but has no need for ‘exchange’). It accepts a ‘queue’ option in the form [ ‘queue_name’, { queue options } ].
overriding #handle(header, payload)
The default implementation for this method is:
def handle(header, payload)
workitem.fields['amqp_message'] = [ header.to_hash, payload ]
end
One is free to override it:
class MyAlertParticipant < Ruote::Amqp::AlertParticipant
def handle(header, payload)
fields = Rufus::Json.decode(payload)
workitem.fields.merge!(fields)
end
end
overriding #on_workitem
Out of the box, the alert participant listens for 1 message on 1 queue. It’s not too difficult to change that.
Resuming after 3 messages:
class MyAlertParticipant < Ruote::Amqp::AlertParticipant
def on_workitem
= []
queue.subscribe { |header, payload |
<< payload
if .size > 2
queue.unsubscribe
workitem.fields['messages'] =
reply # let the flow resume
end
}
end
end
Observing 2 queues:
class MyAlertParticipant < Ruote::Amqp::AlertParticipant
def on_workitem
= []
q0 = channel.queue('zero')
q1 = channel.queue('one')
[ q0, q1 ].subscribe { |header, payload |
<< payload
}
sleep 1.0 while .size < 2
reply # let the flow resume
end
end
Instance Method Summary collapse
Methods inherited from Participant
#correlation_id, #do_not_thread, #exchange, #forget, #initialize, #message, #on_cancel, #persistent, #routing_key
Constructor Details
This class inherits a constructor from Ruote::Amqp::Participant
Instance Method Details
#on_workitem ⇒ Object
116 117 118 119 120 121 122 123 124 |
# File 'lib/ruote/amqp/alert_participant.rb', line 116 def on_workitem queue.subscribe { |header, payload| queue.unsubscribe handle(header, payload) reply } end |