Class: Simrpc::Node
- Inherits:
-
Object
- Object
- Simrpc::Node
- Defined in:
- lib/simrpc/node.rb
Overview
Simrpc Node represents ths main api which to communicate and send/listen for data.
Instance Method Summary collapse
-
#handle_method(method, &handler) ⇒ Object
add a handler which to invoke when an schema method is invoked.
- #id ⇒ Object
-
#initialize(args = {}) ⇒ Node
constructor
Instantiate it w/ a specified id or one will be autogenerated.
-
#join ⇒ Object
wait until the node is no longer accepting messages.
-
#message_received(mid, results) ⇒ Object
implements, message_received callback to be notified when qpid receives a message.
-
#method_missing(method_id, *args) ⇒ Object
can invoke schema methods directly on Node instances, this will catch them and send them onto the destination.
-
#send_method(method_name, destination, *args) ⇒ Object
send method request to remote destination w/ the specified args.
-
#terminate ⇒ Object
Terminate node operations.
Constructor Details
#initialize(args = {}) ⇒ Node
Instantiate it w/ a specified id or one will be autogenerated. Specify schema (or location) containing data and methods which to invoke and/or handle. Optionally specify a remote destination which to send new messages to. Automatically listens for incoming messages.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/simrpc/node.rb', line 112 def initialize(args = {}) @id = args[:id] if args.has_key? :id @schema = args[:schema] @schema_file = args[:schema_file] @destination = args[:destination] if !@schema.nil? @schema_def = Schema::Parser.parse(:schema => @schema) elsif !@schema_file.nil? @schema_def = Schema::Parser.parse(:file => @schema_file) end raise ArgumentError, "schema_def cannot be nil" if @schema_def.nil? @mmc = MethodMessageController.new(@schema_def) # hash of message id's -> response locks @message_locks = {} # FIXME currently not allowing for any other params to be passed into # QpidAdapter::Node such as broker ip or port, NEED TO FIX THIS # NOTE see QpidAdapter::Node for limitation on using 'inspect' @qpid_node = QpidAdapter::Node.new(:id => @id) @qpid_node.async_accept { |node, msg, reply_to| mid, results = @mmc.(node, msg, reply_to) (mid, results) } end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_id, *args) ⇒ Object
can invoke schema methods directly on Node instances, this will catch them and send them onto the destination
194 195 196 |
# File 'lib/simrpc/node.rb', line 194 def method_missing(method_id, *args) send_method(method_id.to_s, @destination, *args) end |
Instance Method Details
#handle_method(method, &handler) ⇒ Object
add a handler which to invoke when an schema method is invoked
161 162 163 164 165 166 167 168 |
# File 'lib/simrpc/node.rb', line 161 def handle_method(method, &handler) @schema_def.methods.each { |smethod| if smethod.name == method.to_s smethod.handler = handler break end } end |
#id ⇒ Object
139 140 141 142 |
# File 'lib/simrpc/node.rb', line 139 def id return @id unless @id.nil? return @qpid_node.node_id end |
#join ⇒ Object
wait until the node is no longer accepting messages
156 157 158 |
# File 'lib/simrpc/node.rb', line 156 def join @qpid_node.join end |
#message_received(mid, results) ⇒ Object
implements, message_received callback to be notified when qpid receives a message
145 146 147 148 |
# File 'lib/simrpc/node.rb', line 145 def (mid, results) @message_results = results unless results.nil? @message_locks[mid].signal unless mid.nil? || !@message_locks.has_key?(mid) end |
#send_method(method_name, destination, *args) ⇒ Object
send method request to remote destination w/ the specified args
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/simrpc/node.rb', line 171 def send_method(method_name, destination, *args) # generate and send new method message msg = @mmc.generate(method_name, args) @qpid_node.(destination + "-queue", msg) # FIXME race condition if response is received b4 wait is invoked # block if we are expecting return values if @schema_def.methods.find{|m| m.name == method_name}.return_values.size != 0 @message_locks[msg.header.id] = Semaphore.new(1) @message_locks[msg.header.id].wait @message_locks[msg.header.id].wait # block until response received @message_locks.delete(msg.header.id) # return return values #@message_received.body.fields.collect { |f| f.value } return *@message_results end return nil end |
#terminate ⇒ Object
Terminate node operations
151 152 153 |
# File 'lib/simrpc/node.rb', line 151 def terminate @qpid_node.terminate end |