Class: RJR::Nodes::AMQP
Overview
AMQP node definition, implements the RJR::Node interface to listen for and invoke json-rpc requests over the Advanced Message Queuing Protocol.
Clients should specify the amqp broker to connect to when initializing a node and specify the remote queue when invoking requests.
Constant Summary collapse
- RJR_NODE_TYPE =
:amqp
- PERSISTENT_NODE =
true
- INDIRECT_NODE =
true
Instance Attribute Summary
Attributes inherited from RJR::Node
#connection_event_handlers, #dispatcher, #message_headers, #node_id
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ AMQP
constructor
AMQPNode initializer.
-
#invoke(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for and return response.
-
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
-
#notify(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated).
-
#send_msg(msg, metadata, &on_publish) ⇒ Object
Publish a message using the amqp exchange.
- #to_s ⇒ Object
Methods inherited from RJR::Node
#clear_event_handlers, em, #em, #halt, #indirect?, indirect?, #join, #node_type, #on, persistent?, #persistent?, tp, #tp
Constructor Details
#initialize(args = {}) ⇒ AMQP
AMQPNode initializer
118 119 120 121 122 |
# File 'lib/rjr/nodes/amqp.rb', line 118 def initialize(args = {}) super(args) @broker = args[:broker] @amqp_lock = Mutex.new end |
Instance Method Details
#invoke(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for and return response.
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/rjr/nodes/amqp.rb', line 169 def invoke(routing_key, rpc_method, *args) = Messages::Request.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule do init_node { subscribe # begin listening for result send_msg(.to_s, :routing_key => routing_key, :reply_to => @queue_name) } end # TODO optional timeout for response result = wait_for_result() if result.size > 2 raise Exception, result[2] end return result[1] end |
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
Implementation of RJR::Node#listen
148 149 150 151 152 153 154 155 |
# File 'lib/rjr/nodes/amqp.rb', line 148 def listen @@em.schedule do init_node { subscribe # start receiving messages } end self end |
#notify(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notif}
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/rjr/nodes/amqp.rb', line 200 def notify(routing_key, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new invoked = false = Messages::Notification.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule do init_node { send_msg(.to_s, :routing_key => routing_key, :reply_to => @queue_name){ published_l.synchronize { invoked = true ; published_c.signal } } } end published_l.synchronize { published_c.wait published_l unless invoked } nil end |
#send_msg(msg, metadata, &on_publish) ⇒ Object
Publish a message using the amqp exchange
Implementation of RJR::Node#send_msg
131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/rjr/nodes/amqp.rb', line 131 def send_msg(msg, , &on_publish) @amqp_lock.synchronize { #raise RJR::Errors::ConnectionError.new("client unreachable") if @disconnected routing_key = [:routing_key] reply_to = [:reply_to] @exchange.publish msg, :routing_key => routing_key, :reply_to => reply_to do |*cargs| on_publish.call unless on_publish.nil? end } nil end |
#to_s ⇒ Object
124 125 126 |
# File 'lib/rjr/nodes/amqp.rb', line 124 def to_s "RJR::Nodes::AMQP<#{@node_id},#{@broker},#{@queue_name}>" end |