Class: PgConduit::RowCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_conduit/row_collector.rb

Overview

A thread safe accumulator, used to chunk an input stream

Instance Method Summary collapse

Constructor Details

#initialize(chunk_size: 100) ⇒ RowCollector

Returns a new instance of RowCollector.

Parameters:

  • chunk_size (Integer) (defaults to: 100)

    How many rows should be collected before yielding



6
7
8
9
10
11
# File 'lib/pg_conduit/row_collector.rb', line 6

def initialize(chunk_size: 100)
  @chunk_size = chunk_size
  @rows = []
  @finished = false
  @mutex = Mutex.new
end

Instance Method Details

#<<(row) ⇒ Object

Parameters:

  • row (Object)

    Row to add to the buffer



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/pg_conduit/row_collector.rb', line 36

def <<(row)
  @mutex.synchronize do
    if @finished
      raise 'Data may not be added to a row collector that has been marked as finished'
    end

    @rows << row
    if @rows.length % @chunk_size == 0
      flush(&@callback)
    end
  end
end

#finishObject

Flushes any collected rows, yielding them to the callback and marks the collector as finished. Any subsequent calls to :<< will raise an error.



51
52
53
54
55
56
# File 'lib/pg_conduit/row_collector.rb', line 51

def finish
  @mutex.synchronize do
    flush(&@callback)
    @finished = true
  end
end

#on_chunk {|Array| ... } ⇒ self

Provide a block to be called with each accumulated chunk

Examples:

Print once every ten rows


collector = RowCollector.new(chunk_size: 10)
collector.on_chunk { |rows| puts rows }

100.times { |n| collector << n }

#> [0,1,2,3,4,5,6,7,8,9]
#> [10,11,12,13,14,15,16,17,18,19]
#> ...etc

Yields:

  • (Array)

    collected rows

Returns:

  • (self)


29
30
31
32
33
# File 'lib/pg_conduit/row_collector.rb', line 29

def on_chunk(&callback)
  self.tap do
    @mutex.synchronize { @callback = callback }
  end
end