Class: Toq::Server

Inherits:
Object show all
Defined in:
lib/toq/server.rb,
lib/toq/server/handler.rb

Overview

RPC server.

Author:

Defined Under Namespace

Classes: Handler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Server

Starts the RPC server.

Examples:

Example options:


{
    :host  => 'localhost',
    :port  => 7331,

    # optional authentication token, if it doesn't match the one
    # set on the server-side you'll be getting exceptions.
    :token => 'superdupersecret',

    # optional serializer (defaults to YAML)
    :serializer => Marshal,

    # In order to enable peer verification one must first provide
    # the following:
    #
    # SSL CA certificate
    :ssl_ca     => cwd + '/../spec/pems/cacert.pem',
    # SSL private key
    :ssl_pkey   => cwd + '/../spec/pems/client/key.pem',
    # SSL certificate
    :ssl_cert   => cwd + '/../spec/pems/client/cert.pem'
}

Parameters:

  • opts (Hash)

Options Hash (opts):

  • :host (String)

    Hostname/IP address.

  • :port (Integer)

    Port number.

  • :socket (String)

    Path to UNIX domain socket.

  • :token (String)

    Optional authentication token.

  • :serializer (.dump, .load) — default: YAML

    Serializer to use for message transmission.

  • :fallback_serializer (.dump, .load)

    Optional fallback serializer to be used when the primary one fails.

  • :max_retries (Integer)

    How many times to retry failed requests.

  • :ssl_ca (String)

    SSL CA certificate.

  • :ssl_pkey (String)

    SSL private key.

  • :ssl_cert (String)

    SSL certificate.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/toq/server.rb', line 72

def initialize( opts )
    @opts = opts

    if @opts[:ssl_pkey] && @opts[:ssl_cert]
        if !File.exist?( @opts[:ssl_pkey] )
            raise "Could not find private key at: #{@opts[:ssl_pkey]}"
        end

        if !File.exist?( @opts[:ssl_cert] )
            raise "Could not find certificate at: #{@opts[:ssl_cert]}"
        end
    end

    @token = @opts[:token]

    @logger = ::Logger.new( STDOUT )
    @logger.level = Logger::INFO

    @host, @port = @opts[:host], @opts[:port]
    @socket = @opts[:socket]

    if !@socket && !(@host || @port)
        fail ArgumentError, 'Needs either a :socket or :host and :port options.'
    end

    @port = @port.to_i

    @reactor = Raktr.global

    clear_handlers
end

Instance Attribute Details

#loggerLogger (readonly)

Returns:

  • (Logger)


30
31
32
# File 'lib/toq/server.rb', line 30

def logger
  @logger
end

#optsHash (readonly)

Returns Configuration options.

Returns:

  • (Hash)

    Configuration options.



27
28
29
# File 'lib/toq/server.rb', line 27

def opts
  @opts
end

#tokenString (readonly)

Returns Authentication token.

Returns:

  • (String)

    Authentication token.



23
24
25
# File 'lib/toq/server.rb', line 23

def token
  @token
end

Instance Method Details

#add_async_check(&block) ⇒ Object

Examples:


server.add_async_check do |method|
    #
    # Must return 'true' for async and 'false' for sync.
    #
    # Very simple check here...
    #
    'async' ==  method.name.to_s.split( '_' )[0]
end

Parameters:

  • block (Block)

    Block to identify methods that pass their result to a block instead of simply returning them (which is the most usual operation of async methods).



118
119
120
# File 'lib/toq/server.rb', line 118

def add_async_check( &block )
    @async_checks << block
end

#add_handler(name, obj) ⇒ Object

Examples:


server.add_handler( 'myclass', MyClass.new )

Parameters:

  • name (String)

    Name by which to make the object available over RPC.

  • obj (Object)

    Instantiated server object to expose.



130
131
132
133
134
135
136
137
138
139
# File 'lib/toq/server.rb', line 130

def add_handler( name, obj )
    @objects[name]       = obj
    @methods[name]       = Set.new
    @async_methods[name] = Set.new

    obj.class.public_instance_methods( false ).each do |method|
        @methods[name]       << method.to_s
        @async_methods[name] << method.to_s if async_check( obj.method( method ) )
    end
end

#alive?TrueClass

Returns:

  • (TrueClass)


218
219
220
# File 'lib/toq/server.rb', line 218

def alive?
    true
end

#call(connection) ⇒ Response

Note:

If the called method is asynchronous it will be sent by this method directly, otherwise it will be handled by the Handler.

Parameters:

  • connection (Handler)

    Connection with request information.

Returns:



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/toq/server.rb', line 178

def call( connection )
    req          = connection.request
    peer_ip_addr = connection.peer_address

    expr, args = req.message, req.args
    meth_name, obj_name = parse_expr( expr )

    log_call( peer_ip_addr, expr, *args )

    if !object_exist?( obj_name )
        msg = "Trying to access non-existent object '#{obj_name}'."
        @logger.error( 'Call' ){ msg + " [on behalf of #{peer_ip_addr}]" }
        raise Exceptions::InvalidObject.new( msg )
    end

    if !public_method?( obj_name, meth_name )
        msg = "Trying to access non-public method '#{meth_name}'."
        @logger.error( 'Call' ){ msg + " [on behalf of #{peer_ip_addr}]" }
        raise Exceptions::InvalidMethod.new( msg )
    end

    # The handler needs to know if this is an async call because if it is
    # we'll have already send the response and it doesn't need to do
    # transmit anything.
    res = Response.new
    res.async! if async?( obj_name, meth_name )

    if res.async?
        @objects[obj_name].send( meth_name.to_sym, *args ) do |obj|
            res.obj = obj
            connection.send_response( res )
        end
    else
        res.obj = @objects[obj_name].send( meth_name.to_sym, *args )
    end

    res
end

#clear_handlersObject

Clears all handlers and their associated information like methods and async check blocks.



146
147
148
149
150
151
152
# File 'lib/toq/server.rb', line 146

def clear_handlers
    @objects = {}
    @methods = {}

    @async_checks  = []
    @async_methods = {}
end

#runObject

Runs the server and blocks while ‘Raktr` is running.



155
156
157
# File 'lib/toq/server.rb', line 155

def run
    @reactor.run { start }
end

#shutdownObject

Shuts down the server after 2 seconds



223
224
225
226
227
228
229
230
231
232
233
# File 'lib/toq/server.rb', line 223

def shutdown
    wait_for = 2

    @logger.info( 'System' ){ "Shutting down in #{wait_for} seconds..." }

    # Don't die before returning...
    @reactor.delay( wait_for ) do
        @reactor.stop
    end
    true
end

#startObject

Starts the server but does not block.



160
161
162
163
164
165
166
167
168
169
# File 'lib/toq/server.rb', line 160

def start
    @logger.info( 'System' ){ 'RPC Server started.' }
    @logger.info( 'System' ) do
        interface = @socket ? @socket : "#{@host}:#{@port}"
        "Listening on #{interface}"
    end

    opts = @socket ? @socket : [@host, @port]
    @reactor.listen( *[opts, Handler, self].flatten )
end