Class: Ductr::ETLJob

Inherits:
Job
  • Object
show all
Includes:
Ductr::ETL::Parser, JobETLRunner
Defined in:
lib/ductr/etl_job.rb

Overview

Base class for ETL job using the experimental fiber runner. Usage example:

class MyETLJob < Ductr::ETLJob
  source :first_db, :basic
  send_to :the_transform, :the_other_transform
  def the_source(db)
    # ...
  end

  transform
  send_to :the_destination
  def the_transform(row)
    # ...
  end

  destination :first_db, :basic
  def the_destination(row, db)
    # ...
  end

  transform
  send_to :the_other_destination
  def the_other_transform(row)
    # ...
  end

  destination :second_db, :basic
  def the_other_destination(row, db)
    # ...
  end
end

Constant Summary collapse

ETL_RUNNER_CLASS =

Returns The ETL runner class used by the job.

Returns:

  • (Class)

    The ETL runner class used by the job

ETL::FiberRunner

Instance Attribute Summary

Attributes inherited from Job

#error, #status

Class Method Summary collapse

Methods included from Ductr::ETL::Parser

#adapters, #parse_annotations

Methods included from JobETLRunner

#initialize, #run

Methods inherited from Job

#adapter, #logger, #perform, #run

Methods included from JobStatus

included, #status=, #stopped?

Class Method Details

.destination(adapter_name, destination_type, **destination_options) ⇒ void

This method returns an undefined value.

Annotation to define a destination method

Examples:

Destination with Sequel SQLite adapter

destination :my_other_adapter, :basic
def my_destination(row, db)
  db[:new_items].insert(name: row[:name], new_name: row[:new_name])
end

Parameters:

  • adapter_name (Symbol)

    The adapter the destination is running on

  • destination_type (Symbol)

    The type of destination to run

  • **destination_options (Hash<Symbol: Object>)

    The options to pass to the destination

See Also:

  • chosen adapter documentation for further information on destinations usage.


133
# File 'lib/ductr/etl_job.rb', line 133

annotable :destination

.lookup(adapter_name, lookup_type, **lookup_options) ⇒ void

This method returns an undefined value.

Annotation to define a lookup method

Examples:

Lookup with Sequel SQLite adapter

lookup :my_other_adapter, :match, merge: [:id, :item], buffer_size: 4
def joining_different_adapters(db, ids)
  db[:items_bis].select(:id, :item, :name).where(item: ids)
end

Parameters:

  • adapter_name (Symbol)

    The adapter the lookup is running on

  • lookup_type (Symbol)

    The type of lookup to run

  • **lookup_options (Hash<Symbol: Object>)

    The options to pass to the lookup

See Also:

  • chosen adapter documentation for further information on lookups usage.


114
# File 'lib/ductr/etl_job.rb', line 114

annotable :lookup

.send_to(*methods) ⇒ void

This method returns an undefined value.

Annotation to define which methods will follow the current one

Examples:

Source with Sequel SQLite adapter sending rows to two transforms

source :my_adapter, :paginated, page_size: 42
send_to :my_first_transform, :my_second_transform
def my_source(db, offset, limit)
  db[:items].offset(offset).limit(limit)
end

transform
def my_first_transform(row)
  # ...
end

transform
def my_second_transform(row)
  # ...
end

Parameters:

  • *methods (Array<Symbol>)

    The names of the following methods



159
# File 'lib/ductr/etl_job.rb', line 159

annotable :send_to

.source(adapter_name, source_type, **source_options) ⇒ void

This method returns an undefined value.

Annotation to define a source method

Examples:

Source with Sequel SQLite adapter

source :my_adapter, :paginated, page_size: 42
def my_source(db, offset, limit)
  db[:items].offset(offset).limit(limit)
end

Parameters:

  • adapter_name (Symbol)

    The adapter the source is running on

  • source_type (Symbol)

    The type of source to run

  • **source_options (Hash<Symbol: Object>)

    The options to pass to the source

See Also:

  • chosen adapter documentation for further information on sources usage.


62
# File 'lib/ductr/etl_job.rb', line 62

annotable :source

.transform(transform_class, **transform_options) ⇒ void

This method returns an undefined value.

Annotation to define a transform method

Examples:

Transform without params

transform
def rename_keys(row)
  row[:new_name] = row.delete[:old_name]
  row[:new_email] = row.delete[:old_email]
end

Transform with params

class RenameTransform < Ductr::ETL::Transform
  def process(row)
    call_method.each do |actual_name, new_name|
      new_key = "#{options[:prefix]}#{new_name}".to_sym

      row[new_key] = row.delete(actual_name)
    end
  end
end

transform RenameTransform, prefix: "some_"
def rename
  { old_name: :new_name, old_email: :new_email }
end

Parameters:

  • transform_class (Class, nil)

    The class the transform is running on

  • **transform_options (Hash<Symbol: Object>)

    The options to pass to the transform



95
# File 'lib/ductr/etl_job.rb', line 95

annotable :transform