$:.unshift('./lib') require 'pg/em/connection_pool' require 'em-synchrony' require 'em-synchrony/fiber_iterator' require 'pp' require 'benchmark' TABLE_NAME = 'resources' LIMIT_ROWS = 5000 include EM::Synchrony unless PG::EM::Client.single_row_mode? raise 'compile pg against pqlib >= 9.2 to support single row mode' end def benchmark(repeat=40) Benchmark.bm(20) do |b| puts b.report("threads #{repeat/1}x1:") { threads(repeat, 1) } b.report("threads #{repeat/5}x5:") { threads(repeat, 5) } b.report("threads #{repeat/10}x10:") { threads(repeat, 10) } b.report("threads #{repeat/20}x20:") { threads(repeat, 20) } b.report("threads #{repeat/40}x40:") { threads(repeat, 40) } puts b.report("fibers #{repeat/1}x1:") { fibers(repeat, 1) } b.report("fibers #{repeat/5}x5:") { fibers(repeat, 5) } b.report("fibers #{repeat/10}x10:") { fibers(repeat, 10) } b.report("fibers #{repeat/20}x20:") { fibers(repeat, 20) } b.report("fibers #{repeat/40}x40:") { fibers(repeat, 40) } end end def threads(repeat, concurrency) db = Hash.new { |pool, id| pool[id] = PG::Connection.new } (0...concurrency).map do |i| Thread.new do (repeat/concurrency).times do stream_results(db[i]) end end end.each(&:join) db.each_value(&:finish).clear end def fibers(repeat, concurrency) EM.synchrony do db = PG::EM::ConnectionPool.new size: concurrency, lazy: true FiberIterator.new((0...concurrency), concurrency).each do db.hold do |pg| (repeat/concurrency).times do stream_results(pg) end end end db.finish EM.stop end end def stream_results(pg) pg.send_query("select * from #{TABLE_NAME}") pg.set_single_row_mode rows = 0 last_time = Time.now while result = pg.get_result begin result.check result.each do |tuple| rows += 1 if rows >= LIMIT_ROWS pg.reset break end end rescue PG::Error => e pg.get_last_result raise e ensure result.clear end end end if $0 == __FILE__ benchmark ARGV[0].to_i.nonzero? || 40 end