Class: RuoteStomp::ParticipantProxy
- Inherits:
-
Object
- Object
- RuoteStomp::ParticipantProxy
- Includes:
- Ruote::LocalParticipant
- Defined in:
- lib/ruote-stomp/participant.rb
Overview
Stomp Participants
The RuoteStomp::ParticipantProxy allows you to send workitems (serialized as JSON) or messages to any Stomp queues right from the process definition. When combined with the RuoteStomp::Receiver you can easily leverage an extremely powerful local/remote participant combinations.
For local/remote participants The local part of the RuoteStomp::ParticipantProxy relies on the presence of a RuoteStomp::Receiver. Workitems are sent to the remote participant and the local part does not normally reply to the engine. Instead the engine will continue when a reply is received on the ‘ruote_workitems’ queue (see RuoteStomp::Receiver).
Of course, the standard :forget => true format can be used even with remote particpants and :forget can even be set as a default in the options.
- NOTE: Working this port next!!!
-
A simple way to create a remote participant to act upon workitems is to use the daemon-kit ruote responder.
Simple Stomp messages are treated as ‘fire and forget’ and the flow will continue when the local participant has queued the message for sending. (As there is no meaningful way to receive a workitem in reply).
Configuration
Stomp configuration is handled by directly manipulating the values of the
Stomp.settings
hash, as provided by the Stomp gem. No Stomp defaults are set by the participant.Usage
Define the queue used by an AMQP participant :
engine.register_participant( :delete_user, RuoteStomp::ParticipantProxy, 'queue' => 'user_manager')
Sending a workitem to the remote participant defined above:
Ruote.process_definition do sequence do delete_user end end
Let the local participant reply to the engine without involving the receiver
Ruote.process_definition do sequence do delete_user :forget => true end end
Setting up the participant in a slightly more ‘raw’ way:
engine.register_participant( :stomp, RuoteStomp::ParticipantProxy )
Sending a workitem to a specific queue:
Ruote.process_definition do sequence do stomp :queue => 'test', 'command' => '/run/regression_test' end end
Setup a ‘fire and forget’ participant that always replies to the engine:
engine.register_participant( :jfdi, RuoteStomp::ParticipantProxy, 'forget' => true )
Sending a message example to a specific queue (both steps are equivalent):
Ruote.process_definition do sequence do stomp :queue => 'test', :message => 'foo' stomp :queue => 'test', :message => 'foo', :forget => true end end
Stomp notes
The direct exchanges are always marked as durable by the participant, and messages are marked as persistent by default (see #RuoteStomp)
Direct Known Subclasses
Instance Method Summary collapse
- #cancel(fei, flavour) ⇒ Object
-
#consume(workitem) ⇒ Object
Process the workitem at hand.
-
#do_not_thread ⇒ Object
[NOT sure about this behavior with Stomp yet. Need to dive.].
-
#initialize(options) ⇒ ParticipantProxy
constructor
The following parameters are used in the process definition.
-
#stop ⇒ Object
(Stops the underlying queue subscription).
Constructor Details
#initialize(options) ⇒ ParticipantProxy
The following parameters are used in the process definition.
An options hash with the same keys to provide defaults is accepted at registration time (see above).
-
:queue => (string) The Stomp queue used by the remote participant. nil by default.
-
:forget => (bool) Whether the flow should block until the remote participant replies. false by default
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/ruote-stomp/participant.rb', line 115 def initialize() @options = { 'queue' => nil, 'forget' => false, }.merge(.inject({}) { |h, (k, v)| h[k.to_s] = v; h }) # # the inject is here to make sure that all options have String keys end |
Instance Method Details
#cancel(fei, flavour) ⇒ Object
161 162 163 164 165 166 |
# File 'lib/ruote-stomp/participant.rb', line 161 def cancel(fei, flavour) # # TODO : sending a cancel item is not a bad idea, especially if the # job done over the stomp fence lasts... # end |
#consume(workitem) ⇒ Object
Process the workitem at hand. By default the workitem will be sended to the direct exchange specified in the queue
workitem parameter. You can specify a message
workitem parameter to have that sent instead of the workitem.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/ruote-stomp/participant.rb', line 131 def consume(workitem) RuoteStomp.start! target_queue = determine_queue(workitem) raise 'no queue specified (outbound delivery)' unless target_queue forget = determine_forget(workitem) opts = { :persistent => RuoteStomp., :content_type => 'application/json' } if = workitem.fields['message'] || workitem.params['message'] forget = true # sending a message implies 'forget' => true $stomp.send target_queue, , opts else $stomp.send target_queue, encode_workitem(workitem), opts end reply_to_engine(workitem) if forget end |
#do_not_thread ⇒ Object
- NOT sure about this behavior with Stomp yet. Need to dive.
170 171 172 |
# File 'lib/ruote-stomp/participant.rb', line 170 def do_not_thread true end |
#stop ⇒ Object
(Stops the underlying queue subscription)
157 158 159 |
# File 'lib/ruote-stomp/participant.rb', line 157 def stop RuoteStomp.stop! end |