Method: #run_test_loop

Defined in:
src/ruby/pb/test/xds_client.rb

#run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) ⇒ Object

send 1 rpc every 1/qps second



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'src/ruby/pb/test/xds_client.rb', line 219

def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs)
  include Grpc::Testing
  simple_req = SimpleRequest.new()
  empty_req = Empty.new()
  target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  # Some RPCs are meant to be "kept open". Since Ruby does not have an
  # async API, we are executing those RPCs in a thread so that they don't
  # block.
  keep_open_threads = Array.new
  while !$shutdown
    now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    sleep_seconds = target_next_start - now
    if sleep_seconds < 0
      target_next_start = now + target_seconds_between_rpcs
    else
      target_next_start += target_seconds_between_rpcs
      sleep(sleep_seconds)
    end
    deadline_sec = $rpc_config.timeout_sec > 0 ? $rpc_config.timeout_sec : 30
    deadline = GRPC::Core::TimeConsts::from_relative_time(deadline_sec)
    results = {}
    $rpc_config.rpcs_to_send.each do |rpc|
      # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here
       = $rpc_config..key?(rpc) ?
                   $rpc_config.[rpc] : {}
      $accumulated_stats_mu.synchronize do
        $num_rpcs_started_by_method[rpc.to_s] += 1
        $accumulated_method_stats[rpc.to_s].increment_rpcs_started()
      end
      if rpc == :UNARY_CALL
        op = stub.unary_call(simple_req,
                             metadata: ,
                             deadline: deadline,
                             return_op: true)
      elsif rpc == :EMPTY_CALL
        op = stub.empty_call(empty_req,
                             metadata: ,
                             deadline: deadline,
                             return_op: true)
      else
        raise "Unsupported rpc #{rpc}"
      end
      keep_open_threads << execute_rpc_in_thread(op, rpc)
    end
    # collect results from threads
    $thread_results_mu.synchronize do
      $thread_results.each do |r|
        rpc_name, remote_peer = r
        results[rpc_name] = remote_peer
      end
      $thread_results = Array.new
    end
    $watchers_mutex.synchronize do
      $watchers.each do |watcher|
        # this is counted once when each group of all rpcs_to_send were done
        watcher['rpcs_needed'] -= 1
        results.each do |rpc_name, remote_peer|
          # These stats expect rpc_name to be in the form of
          # UnaryCall or EmptyCall, not the underscore-case all-caps form
          rpc_name = $RPC_MAP.invert()[rpc_name]
          if remote_peer.strip.empty?
            # error is counted per individual RPC
            watcher['no_remote_peer'] += 1
          else
            if not watcher['rpcs_by_method'].key?(rpc_name)
              watcher['rpcs_by_method'][rpc_name] = Hash.new(0)
            end
            # increment the remote hostname distribution histogram
            # both by overall, and broken down per RPC
            watcher['rpcs_by_method'][rpc_name][remote_peer] +=  1
            watcher['rpcs_by_peer'][remote_peer] += 1
          end
        end
      end
      $watchers_cv.broadcast
    end
  end
  keep_open_threads.each { |thd| thd.join }
end