Class: Ductr::ETL::BufferedTransform

Inherits:
Transform show all
Defined in:
lib/ductr/etl/controls/buffered_transform.rb

Overview

Base class to implement buffered transforms.

Instance Attribute Summary collapse

Attributes inherited from Control

#adapter, #job_method, #options

Instance Method Summary collapse

Methods inherited from Control

#call_method, #initialize

Constructor Details

This class inherits a constructor from Ductr::ETL::Control

Instance Attribute Details

#bufferArray (readonly)

Returns The row buffer.

Returns:

  • (Array)

    The row buffer



10
11
12
# File 'lib/ductr/etl/controls/buffered_transform.rb', line 10

def buffer
  @buffer
end

Instance Method Details

#buffer_sizeInteger

The buffer size option, default to 10_000.

Returns:

  • (Integer)

    The buffer size



17
18
19
# File 'lib/ductr/etl/controls/buffered_transform.rb', line 17

def buffer_size
  @options[:buffer_size] || 10_000
end

#close {|row| ... } ⇒ void

This method returns an undefined value.

Called when the last row is reached.

Yields:

  • (row)

    The row yielder



47
48
49
50
# File 'lib/ductr/etl/controls/buffered_transform.rb', line 47

def close(&)
  flush_buffer(&) unless @buffer.empty?
  super
end

#flush_buffer {|row| ... } ⇒ void

This method returns an undefined value.

Calls #on_flush and reset the buffer.

Yields:

  • (row)

    The row yielder



59
60
61
62
# File 'lib/ductr/etl/controls/buffered_transform.rb', line 59

def flush_buffer(&)
  on_flush(&)
  @buffer = []
end

#on_flush {|row| ... } ⇒ void

This method returns an undefined value.

Called each time the buffer have to be emptied.

Yields:

  • (row)

    The row yielder

Raises:

  • (NotImplementedError)


71
72
73
# File 'lib/ductr/etl/controls/buffered_transform.rb', line 71

def on_flush(&)
  raise NotImplementedError, "A buffered transform must implement the `#on_flush` method"
end

#process(row) {|row| ... } ⇒ nil

Pushes the row inside the buffer or flushes it when full.

Parameters:

  • row (Object)

    The row to process

Yields:

  • (row)

    The row yielder

Returns:

  • (nil)

    Returning nil to complies with kiba



29
30
31
32
33
34
35
36
37
38
# File 'lib/ductr/etl/controls/buffered_transform.rb', line 29

def process(row, &)
  @buffer ||= []

  @buffer.push row
  flush_buffer(&) if @buffer.size == buffer_size

  # avoid returning a row, see
  # https://github.com/thbar/kiba/wiki/Implementing-ETL-transforms#generating-more-than-one-output-row-per-input-row-aka-yielding-transforms
  nil
end