Class: RuoteStomp::Receiver
- Inherits:
-
Ruote::Receiver
- Object
- Ruote::Receiver
- RuoteStomp::Receiver
- Defined in:
- lib/ruote-stomp/receiver.rb
Overview
Stomp Receiver
Used in conjunction with the RuoteStomp::Participant, the WorkitemListener subscribes to a specific direct exchange and monitors for incoming workitems. It expects workitems to arrive serialized as JSON.
Configuration
Stomp configuration is handled by directly manipulating the values of the Stomp.settings
hash, as provided by the Stomp gem. No defaults are set by the listener. The only option
parsed by the initializer of the workitem listener is the queue
key (Hash expected). If no queue
key is set, the listener will subscribe to the ruote_workitems
direct exchange for workitems, otherwise it will subscribe to the direct exchange provided.
Usage
Register the engine or storage with the listener:
RuoteStomp::Receiver.new(engine_or_storage)
The workitem listener leverages the asynchronous nature of the stomp gem, so no timers are setup when initialized.
Options
:queue and :launchitems
See the RuoteStomp::Participant docs for information on sending workitems out to remote participants, and have them send replies to the correct direct exchange specified in the workitem attributes.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#decode_workitem(msg) ⇒ Object
(feel free to overwrite me).
-
#initialize(engine_or_storage, opts = {}) ⇒ Receiver
constructor
Starts a new Receiver.
- #stop ⇒ Object
Constructor Details
#initialize(engine_or_storage, opts = {}) ⇒ Receiver
Starts a new Receiver
Two arguments for this method.
The first one should be a Ruote::Engine, a Ruote::Storage or a Ruote::Worker instance.
The second one is a hash for options. There are two known options :
:queue for setting the queue on which to listen (defaults to ‘ruote_workitems’).
:ignore_disconnect_on_process => true|false (defauts to false) processes the message even if the client has disconnected (use in testing only)
The :launchitems option :
:launchitems => true
# the receiver accepts workitems and launchitems
:launchitems => false
# the receiver only accepts workitems
:launchitems => :only
# the receiver only accepts launchitems
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/ruote-stomp/receiver.rb', line 69 def initialize(engine_or_storage, opts={}) super(engine_or_storage) @launchitems = opts[:launchitems] ignore_disconnect = opts[:ignore_disconnect_on_process] @queue = opts[:queue] || (@launchitems == :only ? '/queue/ruote_launchitems' : '/queue/ruote_workitems') RuoteStomp.start! if opts[:unsubscribe] begin $stomp.unsubscribe(@queue) rescue OnStomp::UnsupportedCommandError => e $stderr.puts("Connection does support unsubscribe") end end $stomp.subscribe(@queue) do || # Process your message here # Your submitted data is in msg.body if $stomp.connected? && !ignore_disconnect # do nothing, we're going down else handle() end end end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
43 44 45 |
# File 'lib/ruote-stomp/receiver.rb', line 43 def queue @queue end |
Instance Method Details
#decode_workitem(msg) ⇒ Object
(feel free to overwrite me)
107 108 109 |
# File 'lib/ruote-stomp/receiver.rb', line 107 def decode_workitem(msg) (Rufus::Json.decode(msg) rescue nil) end |
#stop ⇒ Object
101 102 103 |
# File 'lib/ruote-stomp/receiver.rb', line 101 def stop RuoteStomp.stop! end |