Module: MessagePack::Rpc::Client Abstract

Defined in:
lib/msgpack/rpc/client.rb

Overview

This module is abstract.

Include from the class that implements the rpc client. When the client implementation class receives data from the communication line, it must call the receive_data() method and pass received data to the MessagePack::Rpc::Client module. Also, the client implementation class should define a method send_data() to actually send the data. Call this method from within the MessagePack::Rpc::Client module if necessary (Implement send_data() method to accept string objects in arguments). If you receive a protocol level error, override the on_error() method.

Module that implemented client protocol of MessagePack-RPC.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(klass) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/msgpack/rpc/client.rb', line 31

def included(klass)
  m = Module.new {
    klass.instance_variable_set(:@msgpack_options, {}) 

    def msgpack_options(opts = :none)
      if opts.nil? || opts.kind_of?(Hash)
        @msgpack_options = opts
      end

      return (@msgpack_options = opts)
    end

    def new_unpacker
      return MessagePack::Unpacker.new(@msgpack_options || {})
    end
  }

  klass.extend(m)
end

Instance Method Details

#call(meth, *args) {|res, err| ... } ⇒ Integer

call the procedure of peer rpc server

Parameters:

  • meth (Symbol)

    target procedure name.

  • args (Array)

    arguments for procedure.

Yields:

  • (res, err)

    callback that is when the procedure call completes.

Yield Parameters:

  • res (Object)

    responce of procedure when procedure successed.

  • err (Object)

    error data of procedure when procedure failed.

Returns:

  • (Integer)

    assigned mesaage id

Raises:

  • (ArgumentError)


105
106
107
108
109
110
111
112
113
114
# File 'lib/msgpack/rpc/client.rb', line 105

def call(meth, *args, &blk)
  raise ArgumentError.new("handler is not spcified") if not blk

  id = new_id

  session_map[id] = blk
  send_data([0, id, meth, args].to_msgpack)

  return id
end

#cancel(id) ⇒ Object

Note:

When this method is called, the procedure call corresponding to

cacel the call message

the ID specified in the argument is cancelled.

Parameters:

  • id (Integer)

    message id of calling message (return value of MessagePack::Rpc::Client#call())



127
128
129
# File 'lib/msgpack/rpc/client.rb', line 127

def cancel(id)
  session_map.delete(id)
end

#notify(meth, *args) ⇒ Object

send the notification to peer rpc server

Parameters:

  • meth (Symbol)

    notify name

  • args (Array)

    argument for notification



140
141
142
# File 'lib/msgpack/rpc/client.rb', line 140

def notify(meth, *args)
  send_data([2, meth, args].to_msgpack)
end

#on(name) {|*args| ... } ⇒ Object

define the notify method

from peer rpc server.

Parameters:

  • name (Symbol)

    notification name

Yields:

  • (*args)

    callback that is when received the notification

Raises:

  • (ArgumentError)


230
231
232
233
# File 'lib/msgpack/rpc/client.rb', line 230

def on(name, &blk)
  raise ArgumentError.new("handler is not spcified") if not blk
  notify_handler[name] = blk
end

#receive_dgram(data) ⇒ Object

Note:

Use this method for datagram communication. \ Use it when it is guaranteed that data is exchanged \ in packets (it works a bit faster).

emqueu the received datagram to communication buffer

Parameters:

  • data (Blob)

    recevied data from rpc server.



197
198
199
# File 'lib/msgpack/rpc/client.rb', line 197

def receive_dgram(data)
  eval_response(MessagePack.unpack(data, self.class.msgpack_options))
end

#receive_stream(data) ⇒ Object

emqueu the received data to communication buffer

Parameters:

  • data (Blob)

    recevied data from rpc server.



207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/msgpack/rpc/client.rb', line 207

def receive_stream(data)
  begin
    unpacker.feed_each(data) {|resp| eval_response(resp)}

  rescue MessagePack::UnpackError => e
    unpacker.reset
    error_occured(e)

  rescue => e
    error_occured(e)
  end
end