Class: PgConduit::Pipe

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_conduit/pipe.rb

Instance Method Summary collapse

Constructor Details

#initialize(from:, to:) ⇒ Pipe

Returns a new instance of Pipe.

Examples:

Pipe
  .new(from: query_stream, to: db_writer)
  .send('SELECT name FROM users')
  .as do |user|
    %(INSERT INTO friends (name) VALUES ('#{user["full_name"]}'))
  end


10
11
12
13
14
15
# File 'lib/pg_conduit/pipe.rb', line 10

def initialize(from:, to:)
  @stream = from
  @writer = to
  @reader = ParallelStreamReader.new(@stream)
  @transformers = []
end

Instance Method Details

#peakObject



29
30
31
# File 'lib/pg_conduit/pipe.rb', line 29

def peak
  self.tap { @transformers << ->(row) { row.tap { yield row } } }
end

#read(query) ⇒ Object



17
18
19
# File 'lib/pg_conduit/pipe.rb', line 17

def read(query)
  self.tap { @stream.query(query) }
end

#transform(&transformer) ⇒ Object



21
22
23
# File 'lib/pg_conduit/pipe.rb', line 21

def transform(&transformer)
  self.tap { @transformers << transformer }
end

#writeObject Also known as: exec



25
26
27
# File 'lib/pg_conduit/pipe.rb', line 25

def write
  exec_read { |row| exec_write { exec_transform(row) } }
end

#write_batched(size: 1000) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pg_conduit/pipe.rb', line 33

def write_batched(size: 1000)
  collector = RowCollector.new(chunk_size: size)

  # Set callback to yield collected rows
  collector.on_chunk { |rows| exec_write { yield rows } }

  # Process each row
  exec_read { |row| collector << exec_transform(row) }

  # Yield any remaining rows
  collector.finish
end