Class: Ductr::KibaJob

Inherits:
Job
  • Object
show all
Includes:
ETL::Parser, JobETLRunner
Defined in:
lib/ductr/kiba_job.rb

Overview

Base class for ETL job using kiba’s streaming runner. Example using the SQLite adapter:

class MyKibaJob < Ductr::KibaJob
  source :some_adapter, :paginated, page_size: 4
  def select_some_stuff(db, offset, limit)
    db[:items].offset(offset).limit(limit)
  end

  lookup :some_adapter, :match, merge: [:id, :item], buffer_size: 4
  def merge_with_stuff(db, ids)
    db[:items_bis].select(:id, Sequel.as(:name, :name_bis), :item).where(item: ids)
  end

  transform
  def generate_more_stuff(row)
    { name: "#{row[:name]}_#{row[:name_bis]}" }
  end

  destination :some_other_adapter, :basic
  def my_destination(row, db)
    logger.trace("Hello destination: #{row}")
    db[:new_items].insert(name: row[:name])
  end
end

See Also:

  • chosen adapter documentation for further information on controls usage.

Constant Summary collapse

ETL_RUNNER_CLASS =

Returns The ETL runner class used by the job.

Returns:

  • (Class)

    The ETL runner class used by the job

ETL::KibaRunner

Instance Attribute Summary

Attributes inherited from Job

#error, #status

Class Method Summary collapse

Methods included from ETL::Parser

#adapters, #parse_annotations

Methods included from JobETLRunner

#initialize, #run

Methods inherited from Job

#adapter, #logger, #perform, #run

Methods included from JobStatus

included, #status=, #stopped?

Class Method Details

.destination(adapter_name, destination_type, **destination_options) ⇒ void

This method returns an undefined value.

Annotation to define a destination method

Examples:

Destination with Sequel SQLite adapter

destination :my_other_adapter, :basic
def my_destination(row, db)
  db[:new_items].insert(name: row[:name], new_name: row[:new_name])
end

Parameters:

  • adapter_name (Symbol)

    The adapter the destination is running on

  • destination_type (Symbol)

    The type of destination to run

  • **destination_options (Hash<Symbol: Object>)

    The options to pass to the destination

See Also:

  • chosen adapter documentation for further information on destinations usage.


128
# File 'lib/ductr/kiba_job.rb', line 128

annotable :destination

.lookup(adapter_name, lookup_type, **lookup_options) ⇒ void

This method returns an undefined value.

Annotation to define a lookup method

Examples:

Lookup with Sequel SQLite adapter

lookup :my_other_adapter, :match, merge: [:id, :item], buffer_size: 4
def joining_different_adapters(db, ids)
  db[:items_bis].select(:id, :item, :name).where(item: ids)
end

Parameters:

  • adapter_name (Symbol)

    The adapter the lookup is running on

  • lookup_type (Symbol)

    The type of lookup to run

  • **lookup_options (Hash<Symbol: Object>)

    The options to pass to the lookup

See Also:

  • chosen adapter documentation for further information on lookups usage.


109
# File 'lib/ductr/kiba_job.rb', line 109

annotable :lookup

.source(adapter_name, source_type, **source_options) ⇒ void

This method returns an undefined value.

Annotation to define a source method

Examples:

Source with Sequel SQLite adapter

source :my_adapter, :paginated, page_size: 42
def my_source(db, offset, limit)
  db[:items].offset(offset).limit(limit)
end

Parameters:

  • adapter_name (Symbol)

    The adapter the source is running on

  • source_type (Symbol)

    The type of source to run

  • **source_options (Hash<Symbol: Object>)

    The options to pass to the source

See Also:

  • chosen adapter documentation for further information on sources usage.


57
# File 'lib/ductr/kiba_job.rb', line 57

annotable :source

.transform(transform_class, **transform_options) ⇒ void

This method returns an undefined value.

Annotation to define a transform method

Examples:

Transform without params

transform
def rename_keys(row)
  row[:new_name] = row.delete[:old_name]
  row[:new_email] = row.delete[:old_email]
end

Transform with params

class RenameTransform < Ductr::ETL::Transform
  def process(row)
    call_method.each do |actual_name, new_name|
      new_key = "#{options[:prefix]}#{new_name}".to_sym

      row[new_key] = row.delete(actual_name)
    end
  end
end

transform RenameTransform, prefix: "some_"
def rename
  { old_name: :new_name, old_email: :new_email }
end

Parameters:

  • transform_class (Class, nil)

    The class the transform is running on

  • **transform_options (Hash<Symbol: Object>)

    The options to pass to the transform



90
# File 'lib/ductr/kiba_job.rb', line 90

annotable :transform