Class: ArcFurnace::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/arc-furnace/pipeline.rb

Defined Under Namespace

Classes: PipelineInstance

Class Method Summary collapse

Class Method Details

.filter(node_id, type: BlockFilter, params: {}, &block) ⇒ Object

Define a node that filters rows. By default you get a BlockFilter (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block determines if a given row flows to a downstream node



90
91
92
93
94
95
96
# File 'lib/arc-furnace/pipeline.rb', line 90

def self.filter(node_id, type: BlockFilter, params: {}, &block)
  if block_given? && type <= BlockFilter
    params[:block] = block
  end
  raise "Filter #{type} is not a Filter!" unless type <= Filter
  define_intermediate(node_id, type: type, params: params)
end

.hash_node(node_id, type: ArcFurnace::Hash, params:) ⇒ Object

Define a hash node, processing all rows from it’s source and caching them in-memory.



33
34
35
# File 'lib/arc-furnace/pipeline.rb', line 33

def self.hash_node(node_id, type: ArcFurnace::Hash, params:)
  define_intermediate(node_id, type: type, params: params)
end

.inherited(subclass) ⇒ Object

Ensure that subclasses don’t overwrite the parent’s transform node definitions



13
14
15
# File 'lib/arc-furnace/pipeline.rb', line 13

def self.inherited(subclass)
  subclass.intermediates_map = intermediates_map.dup
end

.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) ⇒ Object

Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key



46
47
48
# File 'lib/arc-furnace/pipeline.rb', line 46

def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:)
  define_intermediate(node_id, type: type, params: params)
end

.instance(params = {}) ⇒ Object

Create an instance to run a transformation, passing the parameters to instantiate the transform instance with. The resulting class instance will have a single public method–#execute, which will perform the transformation.



114
115
116
117
# File 'lib/arc-furnace/pipeline.rb', line 114

def self.instance(params = {})
  @params = params
  PipelineInstance.new(self, params)
end

.merge(node_id, type: ArcFurnace::Merge, params:) ⇒ Object

Define a merge node where rows from multiple source nodes are merged into a single row



70
71
72
# File 'lib/arc-furnace/pipeline.rb', line 70

def self.merge(node_id, type: ArcFurnace::Merge, params:)
  define_intermediate(node_id, type: type, params: params)
end

.observer(node_id, type: BlockObserver, params: {}, &block) ⇒ Object

Define a node that observes rows. By default you get a BlockObserver (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block is ignored; all rows are forwarded to the next node in the line



102
103
104
105
106
107
108
# File 'lib/arc-furnace/pipeline.rb', line 102

def self.observer(node_id, type: BlockObserver, params: {}, &block)
  if block_given? && type <= BlockObserver
    params[:block] = block
  end
  raise "Observer #{type} is not an Observer!" unless type <= Observer
  define_intermediate(node_id, type: type, params: params)
end

.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) ⇒ Object

Define an outer join node where rows from the source are kept even if an associated entity is not found in the hash for the join key



52
53
54
# File 'lib/arc-furnace/pipeline.rb', line 52

def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:)
  define_intermediate(node_id, type: type, params: params)
end

.sink(type:, source:, params:) ⇒ Object

Define the sink for this transformation. Only a single sink may be specified per transformation. The sink is delivered a hash per row or entity, and feeds them from the graph of nodes above it.



20
21
22
23
24
25
26
27
28
29
# File 'lib/arc-furnace/pipeline.rb', line 20

def self.sink(type:, source:, params:)
  if sink_node
    raise 'Sink already defined!'
  end

  @sink_node = -> do
    type.new(resolve_parameters(:sink, params))
  end
  @sink_source = source
end

.source(node_id, type:, params:) ⇒ Object

A source that has row semantics, delivering a hash per row (or per entity) for the source.



39
40
41
42
# File 'lib/arc-furnace/pipeline.rb', line 39

def self.source(node_id, type:, params:)
  raise "Source #{type} is not a Source!" unless type <= Source
  define_intermediate(node_id, type: type, params: params)
end

.transform(node_id, type: BlockTransform, params: {}, &block) ⇒ Object

Define a node that transforms rows. By default you get a BlockTransform (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the row for the next downstream node.



60
61
62
63
64
65
66
# File 'lib/arc-furnace/pipeline.rb', line 60

def self.transform(node_id, type: BlockTransform, params: {}, &block)
  if block_given? && type <= BlockTransform
    params[:block] = block
  end
  raise "Transform #{type} is not a Transform!" unless type <= Transform
  define_intermediate(node_id, type: type, params: params)
end

.unfold(node_id, type: BlockUnfold, params: {}, &block) ⇒ Object

Define a node that unfolds rows. By default you get a BlockUnfold (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the set of rows for the next downstream node.



78
79
80
81
82
83
84
# File 'lib/arc-furnace/pipeline.rb', line 78

def self.unfold(node_id, type: BlockUnfold, params: {}, &block)
  if block_given? && type <= BlockUnfold
    params[:block] = block
  end
  raise "Unfold #{type} is not an Unfold!" unless type <= Unfold
  define_intermediate(node_id, type: type, params: params)
end