Class: PgConduit::RowCollector
- Inherits:
-
Object
- Object
- PgConduit::RowCollector
- Defined in:
- lib/pg_conduit/row_collector.rb
Overview
A thread safe accumulator, used to chunk an input stream
Instance Method Summary collapse
- #<<(row) ⇒ Object
-
#finish ⇒ Object
Flushes any collected rows, yielding them to the callback and marks the collector as finished.
-
#initialize(chunk_size: 100) ⇒ RowCollector
constructor
A new instance of RowCollector.
-
#on_chunk {|Array| ... } ⇒ self
Provide a block to be called with each accumulated chunk.
Constructor Details
#initialize(chunk_size: 100) ⇒ RowCollector
Returns a new instance of RowCollector.
6 7 8 9 10 11 |
# File 'lib/pg_conduit/row_collector.rb', line 6 def initialize(chunk_size: 100) @chunk_size = chunk_size @rows = [] @finished = false @mutex = Mutex.new end |
Instance Method Details
#<<(row) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/pg_conduit/row_collector.rb', line 36 def <<(row) @mutex.synchronize do if @finished raise 'Data may not be added to a row collector that has been marked as finished' end @rows << row if @rows.length % @chunk_size == 0 flush(&@callback) end end end |
#finish ⇒ Object
Flushes any collected rows, yielding them to the callback and marks the collector as finished. Any subsequent calls to :<< will raise an error.
51 52 53 54 55 56 |
# File 'lib/pg_conduit/row_collector.rb', line 51 def finish @mutex.synchronize do flush(&@callback) @finished = true end end |
#on_chunk {|Array| ... } ⇒ self
Provide a block to be called with each accumulated chunk
29 30 31 32 33 |
# File 'lib/pg_conduit/row_collector.rb', line 29 def on_chunk(&callback) self.tap do @mutex.synchronize { @callback = callback } end end |