Class: Ruote::Beanstalk::ParticipantProxy

Inherits:
Object
  • Object
show all
Includes:
LocalParticipant
Defined in:
lib/ruote/beanstalk/participant_proxy.rb

Overview

This participant emits workitems towards a beanstalk queue.

engine.register_participant(
  :heavy_labour,
  :reply_by_default => true, :beanstalk => '127.0.0.1:11300')

workitem format

Workitems are encoded in the format

[ 'workitem', workitem.to_h ]

and then serialized as JSON strings.

cancel items

Like workitems, but the format is

[ 'cancelitem', fei.to_h, flavour.to_s ]

where fei is the FlowExpressionId of the expression getting cancelled (and whose workitems are to be retired) and flavour is either ‘cancel’ or ‘kill’.

extending this participant

Extend and overwrite encode_workitem and encode_cancelitem or simply re-open the class and change those methods.

:beanstalk

Indicates which beanstalk to talk to

engine.register_participant(
  'alice'
  Ruote::Beanstalk::ParticipantProxy,
  'beanstalk' => '127.0.0.1:11300')

:tube

Most of the time, you want the workitems (or the cancelitems) to be emitted over/in a specific tube

engine.register_participant(
  'alice'
  Ruote::Beanstalk::ParticipantProxy,
  'beanstalk' => '127.0.0.1:11300',
  'tube' => 'ruote-workitems')

:reply_by_default

If the participant is configured with ‘reply_by_default’ => true, the participant will dispatch the workitem over to Beanstalk and then immediately reply to its ruote engine (letting the flow resume).

engine.register_participant(
  'alice'
  Ruote::Beanstalk::ParticipantProxy,
  'beanstalk' => '127.0.0.1:11300',
  'reply_by_default' => true)

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ ParticipantProxy

Returns a new instance of ParticipantProxy.



105
106
107
108
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 105

def initialize(opts)

  @opts = opts
end

Instance Method Details

#cancel(fei, flavour) ⇒ Object



122
123
124
125
126
127
128
129
130
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 122

def cancel(fei, flavour)

  con = new_connection

  con.put(encode_cancelitem(fei, flavour))

ensure
  con.close rescue nil
end

#consume(workitem) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 110

def consume(workitem)

  con = new_connection

  con.put(encode_workitem(workitem))

  reply(workitem) if @opts['reply_by_default']

ensure
  con.close rescue nil
end

#encode_cancelitem(fei, flavour) ⇒ Object



137
138
139
140
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 137

def encode_cancelitem(fei, flavour)

  Rufus::Json.encode([ 'cancelitem', fei.to_h, flavour.to_s ])
end

#encode_workitem(workitem) ⇒ Object



132
133
134
135
# File 'lib/ruote/beanstalk/participant_proxy.rb', line 132

def encode_workitem(workitem)

  Rufus::Json.encode([ 'workitem', workitem.to_h ])
end