Class: Ruote::Amqp::AlertParticipant

Inherits:
Participant show all
Defined in:
lib/ruote/amqp/alert_participant.rb

Overview

The alert participant, when invoked from a process instance will lay in wait for the next message on a given queue. As soon as the message comes in, it will pack it in the workitem fields and let the process definition resume.

@dashboard.register(
  :wait_for_info,
  Ruote::Amqp::AlertParticipant,
  :queue => 'info')

pdef = Ruote.define do
  wait_for_job
  # ... the rest of the flow
end

configuration

This class is mostly a subclass of Ruote::Amqp::Participant, it accepts the same configuration options (but has no need for ‘exchange’). It accepts a ‘queue’ option in the form [ ‘queue_name’, { queue options } ].

overriding #handle(header, payload)

The default implementation for this method is:

def handle(header, payload)
  workitem.fields['amqp_message'] = [ header.to_hash, payload ]
end

One is free to override it:

class MyAlertParticipant < Ruote::Amqp::AlertParticipant
  def handle(header, payload)
    fields = Rufus::Json.decode(payload)
    workitem.fields.merge!(fields)
  end
end

overriding #on_workitem

Out of the box, the alert participant listens for 1 message on 1 queue. It’s not too difficult to change that.

Resuming after 3 messages:

class MyAlertParticipant < Ruote::Amqp::AlertParticipant

  def on_workitem

    messages = []

    queue.subscribe { |header, payload |

      messages << payload

      if messages.size > 2
        queue.unsubscribe
        workitem.fields['messages'] = messages
        reply # let the flow resume
      end
    }
  end
end

Observing 2 queues:

class MyAlertParticipant < Ruote::Amqp::AlertParticipant

  def on_workitem

    messages = []

    q0 = channel.queue('zero')
    q1 = channel.queue('one')

    [ q0, q1 ].subscribe { |header, payload |
      messages << payload
    }

    sleep 1.0 while messages.size < 2

    reply # let the flow resume
  end
end

Instance Method Summary collapse

Methods inherited from Participant

#correlation_id, #do_not_thread, #exchange, #forget, #initialize, #message, #on_cancel, #persistent, #routing_key

Constructor Details

This class inherits a constructor from Ruote::Amqp::Participant

Instance Method Details

#on_workitemObject



116
117
118
119
120
121
122
123
124
# File 'lib/ruote/amqp/alert_participant.rb', line 116

def on_workitem

  queue.subscribe { |header, payload|

    queue.unsubscribe
    handle(header, payload)
    reply
  }
end