Class: Ruote::Beanstalk::ParticipantProxy
- Inherits:
-
Object
- Object
- Ruote::Beanstalk::ParticipantProxy
- Includes:
- LocalParticipant
- Defined in:
- lib/ruote/beanstalk/participant_proxy.rb
Overview
This participant emits workitems towards a beanstalk queue.
engine.register_participant(
:heavy_labour,
:reply_by_default => true, :beanstalk => '127.0.0.1:11300')
workitem format
Workitems are encoded in the format
[ 'workitem', workitem.to_h ]
and then serialized as JSON strings.
cancel items
Like workitems, but the format is
[ 'cancelitem', fei.to_h, flavour.to_s ]
where fei is the FlowExpressionId of the expression getting cancelled (and whose workitems are to be retired) and flavour is either ‘cancel’ or ‘kill’.
extending this participant
Extend and overwrite encode_workitem and encode_cancelitem or simply re-open the class and change those methods.
:beanstalk
Indicates which beanstalk to talk to
engine.register_participant(
'alice'
Ruote::Beanstalk::ParticipantProxy,
'beanstalk' => '127.0.0.1:11300')
:tube
Most of the time, you want the workitems (or the cancelitems) to be emitted over/in a specific tube
engine.register_participant(
'alice'
Ruote::Beanstalk::ParticipantProxy,
'beanstalk' => '127.0.0.1:11300',
'tube' => 'ruote-workitems')
:reply_by_default
If the participant is configured with ‘reply_by_default’ => true, the participant will dispatch the workitem over to Beanstalk and then immediately reply to its ruote engine (letting the flow resume).
engine.register_participant(
'alice'
Ruote::Beanstalk::ParticipantProxy,
'beanstalk' => '127.0.0.1:11300',
'reply_by_default' => true)
Instance Method Summary collapse
- #cancel(fei, flavour) ⇒ Object
- #consume(workitem) ⇒ Object
- #encode_cancelitem(fei, flavour) ⇒ Object
- #encode_workitem(workitem) ⇒ Object
-
#initialize(opts) ⇒ ParticipantProxy
constructor
A new instance of ParticipantProxy.
Constructor Details
#initialize(opts) ⇒ ParticipantProxy
Returns a new instance of ParticipantProxy.
105 106 107 108 |
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 105 def initialize(opts) @opts = opts end |
Instance Method Details
#cancel(fei, flavour) ⇒ Object
122 123 124 125 126 127 128 129 130 |
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 122 def cancel(fei, flavour) con = new_connection con.put(encode_cancelitem(fei, flavour)) ensure con.close rescue nil end |
#consume(workitem) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 110 def consume(workitem) con = new_connection con.put(encode_workitem(workitem)) reply(workitem) if @opts['reply_by_default'] ensure con.close rescue nil end |
#encode_cancelitem(fei, flavour) ⇒ Object
137 138 139 140 |
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 137 def encode_cancelitem(fei, flavour) Rufus::Json.encode([ 'cancelitem', fei.to_h, flavour.to_s ]) end |
#encode_workitem(workitem) ⇒ Object
132 133 134 135 |
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 132 def encode_workitem(workitem) Rufus::Json.encode([ 'workitem', workitem.to_h ]) end |