Class: Simrpc::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/simrpc/node.rb

Overview

Simrpc Node represents ths main api which to communicate and send/listen for data.

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Node

Instantiate it w/ a specified id or one will be autogenerated. Specify schema (or location) containing data and methods which to invoke and/or handle. Optionally specify a remote destination which to send new messages to. Automatically listens for incoming messages.

Raises:

  • (ArgumentError)


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/simrpc/node.rb', line 112

def initialize(args = {})
   @id = args[:id] if args.has_key? :id
   @schema = args[:schema]
   @schema_file = args[:schema_file]
   @destination = args[:destination]

   if !@schema.nil?
     @schema_def = Schema::Parser.parse(:schema => @schema)
   elsif !@schema_file.nil?
     @schema_def = Schema::Parser.parse(:file => @schema_file)
   end
   raise ArgumentError, "schema_def cannot be nil" if @schema_def.nil?
   @mmc = MethodMessageController.new(@schema_def)

   # hash of message id's -> response locks
   @message_locks = {}

   # FIXME currently not allowing for any other params to be passed into
   # QpidAdapter::Node such as broker ip or port, NEED TO FIX THIS
   # NOTE see QpidAdapter::Node for limitation on using 'inspect'
   @qpid_node = QpidAdapter::Node.new(:id => @id)
   @qpid_node.async_accept { |node, msg, reply_to|
       mid, results = @mmc.message_received(node, msg, reply_to)
       message_received(mid, results)
   }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_id, *args) ⇒ Object

can invoke schema methods directly on Node instances, this will catch them and send them onto the destination



194
195
196
# File 'lib/simrpc/node.rb', line 194

def method_missing(method_id, *args)
  send_method(method_id.to_s, @destination, *args)
end

Instance Method Details

#handle_method(method, &handler) ⇒ Object

add a handler which to invoke when an schema method is invoked



161
162
163
164
165
166
167
168
# File 'lib/simrpc/node.rb', line 161

def handle_method(method, &handler)
   @schema_def.methods.each { |smethod|
      if smethod.name == method.to_s
          smethod.handler = handler
          break
      end
   }
end

#idObject



139
140
141
142
# File 'lib/simrpc/node.rb', line 139

def id
  return @id unless @id.nil?
  return @qpid_node.node_id
end

#joinObject

wait until the node is no longer accepting messages



156
157
158
# File 'lib/simrpc/node.rb', line 156

def join
    @qpid_node.join
end

#message_received(mid, results) ⇒ Object

implements, message_received callback to be notified when qpid receives a message



145
146
147
148
# File 'lib/simrpc/node.rb', line 145

def message_received(mid, results)
    @message_results = results unless results.nil?
    @message_locks[mid].signal unless mid.nil? || !@message_locks.has_key?(mid)
end

#send_method(method_name, destination, *args) ⇒ Object

send method request to remote destination w/ the specified args



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/simrpc/node.rb', line 171

def send_method(method_name, destination, *args)
   # generate and send new method message
   msg = @mmc.generate(method_name, args)
   @qpid_node.send_message(destination + "-queue", msg)

   # FIXME race condition if response is received b4 wait is invoked

   # block if we are expecting return values
   if @schema_def.methods.find{|m| m.name == method_name}.return_values.size != 0
     @message_locks[msg.header.id] =  Semaphore.new(1)
     @message_locks[msg.header.id].wait
     @message_locks[msg.header.id].wait # block until response received
     @message_locks.delete(msg.header.id)

     # return return values
     #@message_received.body.fields.collect { |f| f.value }
     return *@message_results
   end
   return nil
end

#terminateObject

Terminate node operations



151
152
153
# File 'lib/simrpc/node.rb', line 151

def terminate
  @qpid_node.terminate
end