Class: Toq::Client
- Defined in:
- lib/toq/client.rb,
lib/toq/client/handler.rb
Overview
Simple RPC client capable of:
-
TLS encryption.
-
Asynchronous and synchronous requests.
-
Handling remote asynchronous calls that defer their result.
Defined Under Namespace
Classes: Handler
Constant Summary collapse
- DEFAULT_CONNECTION_POOL_SIZE =
Default amount of connections to maintain in the re-use pool.
1
Instance Attribute Summary collapse
-
#connection_count ⇒ Integer
readonly
Amount of connections in the pool.
-
#opts ⇒ Hash
readonly
Options hash.
-
#reactor ⇒ Object
readonly
Returns the value of attribute reactor.
Instance Method Summary collapse
-
#call(msg, *args, &block) ⇒ Object
Calls a remote method and grabs the result.
-
#close ⇒ Object
Close all connections.
-
#connect(&block) ⇒ Boolean
Connection factory, will re-use or create new connections as needed to accommodate the workload.
-
#connection_failed(connection) ⇒ Object
Handles failed connections.
- #increment_connection_counter ⇒ Object
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
- #push_connection(connection) ⇒ Object
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/toq/client.rb', line 73 def initialize( opts ) @opts = opts.merge( role: :client ) @token = @opts[:token] @host, @port = @opts[:host], @opts[:port].to_i @socket = @opts[:socket] if !@socket && !(@host || @port) fail ArgumentError, 'Needs either a :socket or :host and :port options.' end @port = @port.to_i if @host && @port <= 0 fail ArgumentError, "Invalid port: #{@port}" end @pool_size = @opts[:connection_pool_size] || DEFAULT_CONNECTION_POOL_SIZE @reactor = Raktr.new @connections = @reactor.create_queue @connection_count = 0 end |
Instance Attribute Details
#connection_count ⇒ Integer (readonly)
Returns Amount of connections in the pool.
33 34 35 |
# File 'lib/toq/client.rb', line 33 def connection_count @connection_count end |
#opts ⇒ Hash (readonly)
Returns Options hash.
29 30 31 |
# File 'lib/toq/client.rb', line 29 def opts @opts end |
#reactor ⇒ Object (readonly)
Returns the value of attribute reactor.
25 26 27 |
# File 'lib/toq/client.rb', line 25 def reactor @reactor end |
Instance Method Details
#call(msg, *args, &block) ⇒ Object
Calls a remote method and grabs the result.
There are 2 ways to perform a call, async (non-blocking) and sync (blocking).
187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/toq/client.rb', line 187 def call( msg, *args, &block ) ensure_reactor_running req = Request.new( message: msg, args: args, callback: block, token: @token ) block_given? ? call_async( req ) : call_sync( req ) end |
#close ⇒ Object
Close all connections.
136 137 138 139 140 141 142 143 |
# File 'lib/toq/client.rb', line 136 def close ensure_reactor_running @reactor.on_tick do |task| @connections.pop(&:close_without_retry) task.done if @connections.empty? end end |
#connect(&block) ⇒ Boolean
Connection factory, will re-use or create new connections as needed to accommodate the workload.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/toq/client.rb', line 107 def connect( &block ) ensure_reactor_running if @connections.empty? && @connection_count < @pool_size opts = @socket ? @socket : [@host, @port] block.call @reactor.connect( *[opts, Handler, @opts.merge( client: self )].flatten ) increment_connection_counter return true end pop_block = proc do |conn| # Some connections may have died while they were waiting in the # queue, get rid of them and start all over in case the queue has # been emptied. if !conn.done? connection_failed conn connect( &block ) next end block.call conn end @connections.pop( &pop_block ) false end |
#connection_failed(connection) ⇒ Object
Handles failed connections.
161 162 163 164 165 166 |
# File 'lib/toq/client.rb', line 161 def connection_failed( connection ) ensure_reactor_running @connection_count -= 1 connection.close_without_retry end |
#increment_connection_counter ⇒ Object
145 146 147 |
# File 'lib/toq/client.rb', line 145 def increment_connection_counter @connection_count += 1 end |