Method: #run_test_loop
- Defined in:
- src/ruby/pb/test/xds_client.rb
#run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) ⇒ Object
send 1 rpc every 1/qps second
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'src/ruby/pb/test/xds_client.rb', line 219 def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) include Grpc::Testing simple_req = SimpleRequest.new() empty_req = Empty.new() target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) # Some RPCs are meant to be "kept open". Since Ruby does not have an # async API, we are executing those RPCs in a thread so that they don't # block. keep_open_threads = Array.new while !$shutdown now = Process.clock_gettime(Process::CLOCK_MONOTONIC) sleep_seconds = target_next_start - now if sleep_seconds < 0 target_next_start = now + target_seconds_between_rpcs else target_next_start += target_seconds_between_rpcs sleep(sleep_seconds) end deadline_sec = $rpc_config.timeout_sec > 0 ? $rpc_config.timeout_sec : 30 deadline = GRPC::Core::TimeConsts::from_relative_time(deadline_sec) results = {} $rpc_config.rpcs_to_send.each do |rpc| # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here = $rpc_config..key?(rpc) ? $rpc_config.[rpc] : {} $accumulated_stats_mu.synchronize do $num_rpcs_started_by_method[rpc.to_s] += 1 $accumulated_method_stats[rpc.to_s].increment_rpcs_started() end if rpc == :UNARY_CALL op = stub.unary_call(simple_req, metadata: , deadline: deadline, return_op: true) elsif rpc == :EMPTY_CALL op = stub.empty_call(empty_req, metadata: , deadline: deadline, return_op: true) else raise "Unsupported rpc #{rpc}" end keep_open_threads << execute_rpc_in_thread(op, rpc) end # collect results from threads $thread_results_mu.synchronize do $thread_results.each do |r| rpc_name, remote_peer = r results[rpc_name] = remote_peer end $thread_results = Array.new end $watchers_mutex.synchronize do $watchers.each do |watcher| # this is counted once when each group of all rpcs_to_send were done watcher['rpcs_needed'] -= 1 results.each do |rpc_name, remote_peer| # These stats expect rpc_name to be in the form of # UnaryCall or EmptyCall, not the underscore-case all-caps form rpc_name = $RPC_MAP.invert()[rpc_name] if remote_peer.strip.empty? # error is counted per individual RPC watcher['no_remote_peer'] += 1 else if not watcher['rpcs_by_method'].key?(rpc_name) watcher['rpcs_by_method'][rpc_name] = Hash.new(0) end # increment the remote hostname distribution histogram # both by overall, and broken down per RPC watcher['rpcs_by_method'][rpc_name][remote_peer] += 1 watcher['rpcs_by_peer'][remote_peer] += 1 end end end $watchers_cv.broadcast end end keep_open_threads.each { |thd| thd.join } end |