Class: PgConduit::Pipe
- Inherits:
-
Object
- Object
- PgConduit::Pipe
- Defined in:
- lib/pg_conduit/pipe.rb
Instance Method Summary collapse
-
#initialize(from:, to:) ⇒ Pipe
constructor
A new instance of Pipe.
- #peak ⇒ Object
- #read(query) ⇒ Object
- #transform(&transformer) ⇒ Object
- #write ⇒ Object (also: #exec)
- #write_batched(size: 1000) ⇒ Object
Constructor Details
#initialize(from:, to:) ⇒ Pipe
Returns a new instance of Pipe.
10 11 12 13 14 15 |
# File 'lib/pg_conduit/pipe.rb', line 10 def initialize(from:, to:) @stream = from @writer = to @reader = ParallelStreamReader.new(@stream) @transformers = [] end |
Instance Method Details
#peak ⇒ Object
29 30 31 |
# File 'lib/pg_conduit/pipe.rb', line 29 def peak self.tap { @transformers << ->(row) { row.tap { yield row } } } end |
#read(query) ⇒ Object
17 18 19 |
# File 'lib/pg_conduit/pipe.rb', line 17 def read(query) self.tap { @stream.query(query) } end |
#transform(&transformer) ⇒ Object
21 22 23 |
# File 'lib/pg_conduit/pipe.rb', line 21 def transform(&transformer) self.tap { @transformers << transformer } end |
#write ⇒ Object Also known as: exec
25 26 27 |
# File 'lib/pg_conduit/pipe.rb', line 25 def write exec_read { |row| exec_write { exec_transform(row) } } end |
#write_batched(size: 1000) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pg_conduit/pipe.rb', line 33 def write_batched(size: 1000) collector = RowCollector.new(chunk_size: size) # Set callback to yield collected rows collector.on_chunk { |rows| exec_write { yield rows } } # Process each row exec_read { |row| collector << exec_transform(row) } # Yield any remaining rows collector.finish end |