Class: PgConduit::ParallelStreamReader

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_conduit/parallel_stream_reader.rb

Overview

A multi threaded stream reader

Instance Method Summary collapse

Constructor Details

#initialize(query_stream, threads: 5, queue_max_size: 1000) ⇒ ParallelStreamReader

Returns a new instance of ParallelStreamReader.

Parameters:

  • query_stream (PgConduit::QueryStream)
  • threads (Integer) (defaults to: 5)

    The number of threads to use for workers

  • queue_max_size (Integer) (defaults to: 1000)

    How many rows should be stored in memory in the work queue.



8
9
10
11
12
# File 'lib/pg_conduit/parallel_stream_reader.rb', line 8

def initialize(query_stream, threads: 5, queue_max_size: 1000)
  @queue = SizedQueue.new(queue_max_size)
  @workers = threads
  @stream = query_stream
end

Instance Method Details

#read {|Hash| ... } ⇒ Object

Read A QueryStream and yield it’s rows

Yields:

  • (Hash)

    A single row from the QueryStream. Every row from the stream will be yielded but order is not guaranteed.



18
19
20
21
22
23
24
# File 'lib/pg_conduit/parallel_stream_reader.rb', line 18

def read(&callback)
  reader  = read_stream(@stream)
  workers = dispatch_workers(&callback)
  reader.join
  workers.each { |t| t.join }
  :ok
end