Class: Toq::Server

Inherits:
Object show all
Defined in:
lib/toq/server.rb,
lib/toq/server/handler.rb

Overview

RPC server.

Author:

Defined Under Namespace

Classes: Handler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Server

Starts the RPC server.

Examples:

Example options:


{
    :host  => 'localhost',
    :port  => 7331,

    # optional authentication token, if it doesn't match the one
    # set on the server-side you'll be getting exceptions.
    :token => 'superdupersecret',

    # optional serializer (defaults to YAML)
    :serializer => Marshal,

    # In order to enable peer verification one must first provide
    # the following:
    #
    # SSL CA certificate
    :ssl_ca     => cwd + '/../spec/pems/cacert.pem',
    # SSL private key
    :ssl_pkey   => cwd + '/../spec/pems/client/key.pem',
    # SSL certificate
    :ssl_cert   => cwd + '/../spec/pems/client/cert.pem'
}

Parameters:

  • opts (Hash)

Options Hash (opts):

  • :host (String)

    Hostname/IP address.

  • :port (Integer)

    Port number.

  • :socket (String)

    Path to UNIX domain socket.

  • :token (String)

    Optional authentication token.

  • :serializer (.dump, .load) — default: YAML

    Serializer to use for message transmission.

  • :fallback_serializer (.dump, .load)

    Optional fallback serializer to be used when the primary one fails.

  • :max_retries (Integer)

    How many times to retry failed requests.

  • :ssl_ca (String)

    SSL CA certificate.

  • :ssl_pkey (String)

    SSL private key.

  • :ssl_cert (String)

    SSL certificate.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/toq/server.rb', line 74

def initialize( opts )
    @opts = opts

    if @opts[:ssl_pkey] && @opts[:ssl_cert]
        if !File.exist?( @opts[:ssl_pkey] )
            raise "Could not find private key at: #{@opts[:ssl_pkey]}"
        end

        if !File.exist?( @opts[:ssl_cert] )
            raise "Could not find certificate at: #{@opts[:ssl_cert]}"
        end
    end

    @token = @opts[:token]

    @logger = ::Logger.new( STDOUT )
    @logger.level = Logger::INFO

    @host, @port = @opts[:host], @opts[:port]
    @socket = @opts[:socket]

    if !@socket && !(@host || @port)
        fail ArgumentError, 'Needs either a :socket or :host and :port options.'
    end

    @port = @port.to_i

    @reactor = Raktr.global

    clear_handlers
end

Instance Attribute Details

#loggerLogger (readonly)

Returns:

  • (Logger)


30
31
32
# File 'lib/toq/server.rb', line 30

def logger
  @logger
end

#optsHash (readonly)

Returns Configuration options.

Returns:

  • (Hash)

    Configuration options.



27
28
29
# File 'lib/toq/server.rb', line 27

def opts
  @opts
end

#reactorObject (readonly)

Returns the value of attribute reactor.



32
33
34
# File 'lib/toq/server.rb', line 32

def reactor
  @reactor
end

#tokenString (readonly)

Returns Authentication token.

Returns:

  • (String)

    Authentication token.



23
24
25
# File 'lib/toq/server.rb', line 23

def token
  @token
end

Instance Method Details

#add_async_check(&block) ⇒ Object

Examples:


server.add_async_check do |method|
    #
    # Must return 'true' for async and 'false' for sync.
    #
    # Very simple check here...
    #
    'async' ==  method.name.to_s.split( '_' )[0]
end

Parameters:

  • block (Block)

    Block to identify methods that pass their result to a block instead of simply returning them (which is the most usual operation of async methods).



120
121
122
# File 'lib/toq/server.rb', line 120

def add_async_check( &block )
    @async_checks << block
end

#add_handler(name, obj) ⇒ Object

Examples:


server.add_handler( 'myclass', MyClass.new )

Parameters:

  • name (String)

    Name by which to make the object available over RPC.

  • obj (Object)

    Instantiated server object to expose.



132
133
134
# File 'lib/toq/server.rb', line 132

def add_handler( name, obj )
    @objects[name] = obj
end

#alive?TrueClass

Returns:

  • (TrueClass)


216
217
218
# File 'lib/toq/server.rb', line 216

def alive?
    true
end

#call(connection) ⇒ Response

Note:

If the called method is asynchronous it will be sent by this method directly, otherwise it will be handled by the Handler.

Parameters:

  • connection (Handler)

    Connection with request information.

Returns:



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/toq/server.rb', line 176

def call( connection )
    req          = connection.request
    peer_ip_addr = connection.peer_address

    expr, args = req.message, req.args
    meth_name, obj_name = parse_expr( expr )

    log_call( peer_ip_addr, expr, *args )

    if !object_exist?( obj_name )
        msg = "Trying to access non-existent object '#{obj_name}'."
        @logger.error( 'Call' ){ msg + " [on behalf of #{peer_ip_addr}]" }
        raise Exceptions::InvalidObject.new( msg )
    end

    if !method_safe?( obj_name, meth_name )
        msg = "Trying to access unsafe method '#{meth_name}'."
        @logger.error( 'Call' ){ msg + " [on behalf of #{peer_ip_addr}]" }
        raise Exceptions::UnsafeMethod.new( msg )
    end

    # The handler needs to know if this is an async call because if it is
    # we'll have already send the response and it doesn't need to do
    # transmit anything.
    res = Response.new
    res.async! if async?( obj_name, meth_name )

    if res.async?
        @objects[obj_name].send( meth_name.to_sym, *args ) do |obj|
            res.obj = obj
            connection.send_response( res )
        end
    else
        res.obj = @objects[obj_name].send( meth_name.to_sym, *args )
    end

    res
end

#clear_handlersObject

Clears all handlers and their associated information like methods and async check blocks.



141
142
143
144
# File 'lib/toq/server.rb', line 141

def clear_handlers
    @objects = {}
    @async_checks = []
end

#runObject

Runs the server and blocks while ‘Raktr` is running.



147
148
149
150
151
152
153
154
155
# File 'lib/toq/server.rb', line 147

def run
    @reactor.run do
        @reactor.on_error do |e|
            @logger.error( 'System' ){ "#{e}" }
        end

        start
    end
end

#shutdownObject

Shuts down the server after 2 seconds



221
222
223
224
225
226
227
228
229
230
231
# File 'lib/toq/server.rb', line 221

def shutdown
    wait_for = 2

    @logger.info( 'System' ){ "Shutting down in #{wait_for} seconds..." }

    # Don't die before returning...
    @reactor.delay( wait_for ) do
        @reactor.stop
    end
    true
end

#startObject

Starts the server but does not block.



158
159
160
161
162
163
164
165
166
167
# File 'lib/toq/server.rb', line 158

def start
    @logger.info( 'System' ){ "[PID #{Process.pid}] RPC Server started." }
    @logger.info( 'System' ) do
        interface = @socket ? @socket : "#{@host}:#{@port}"
        "Listening on #{interface}"
    end

    opts = @socket ? @socket : [@host, @port]
    @reactor.listen( *[opts, Handler, self].flatten )
end