Class: Ductr::ETL::BufferedDestination

Inherits:
Destination show all
Defined in:
lib/ductr/etl/controls/buffered_destination.rb

Overview

Base class to implement buffered destinations.

Instance Attribute Summary collapse

Attributes inherited from Control

#adapter, #job_method, #options

Instance Method Summary collapse

Methods inherited from Control

#call_method, #initialize

Constructor Details

This class inherits a constructor from Ductr::ETL::Control

Instance Attribute Details

#bufferArray (readonly)

Returns The row buffer.

Returns:

  • (Array)

    The row buffer



10
11
12
# File 'lib/ductr/etl/controls/buffered_destination.rb', line 10

def buffer
  @buffer
end

Instance Method Details

#buffer_sizeInteger

The buffer size option, default to 10_000.

Returns:

  • (Integer)

    The buffer size



17
18
19
# File 'lib/ductr/etl/controls/buffered_destination.rb', line 17

def buffer_size
  @options[:buffer_size] || 10_000
end

#closevoid

This method returns an undefined value.

Flushes the buffer, called when the last row is reached.



40
41
42
43
# File 'lib/ductr/etl/controls/buffered_destination.rb', line 40

def close
  flush_buffer unless @buffer.empty?
  super
end

#flush_buffervoid

This method returns an undefined value.

Calls #on_flush and reset the buffer.



50
51
52
53
# File 'lib/ductr/etl/controls/buffered_destination.rb', line 50

def flush_buffer
  on_flush
  @buffer = []
end

#on_flushvoid

This method returns an undefined value.

Called each time the buffer have to be emptied.

Raises:

  • (NotImplementedError)


60
61
62
# File 'lib/ductr/etl/controls/buffered_destination.rb', line 60

def on_flush
  raise NotImplementedError, "A buffered destination must implement the `#on_flush` method"
end

#write(row) ⇒ void

This method returns an undefined value.

Pushes the row inside the buffer or flushes it when full.

Parameters:

  • row (Object)

    The row to write



28
29
30
31
32
33
# File 'lib/ductr/etl/controls/buffered_destination.rb', line 28

def write(row)
  @buffer ||= []

  @buffer.push row
  flush_buffer if @buffer.size == buffer_size
end