Class: ArcFurnace::Pipeline
- Inherits:
-
Object
- Object
- ArcFurnace::Pipeline
- Defined in:
- lib/arc-furnace/pipeline.rb
Defined Under Namespace
Classes: PipelineInstance
Class Method Summary collapse
-
.filter(node_id, type: BlockFilter, params: {}, &block) ⇒ Object
Define a node that filters rows.
-
.hash_node(node_id, type: ArcFurnace::Hash, params:) ⇒ Object
Define a hash node, processing all rows from it’s source and caching them in-memory.
-
.inherited(subclass) ⇒ Object
Ensure that subclasses don’t overwrite the parent’s transform node definitions.
-
.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) ⇒ Object
Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key.
-
.instance(params = {}) ⇒ Object
Create an instance to run a transformation, passing the parameters to instantiate the transform instance with.
-
.merge(node_id, type: ArcFurnace::Merge, params:) ⇒ Object
Define a merge node where rows from multiple source nodes are merged into a single row.
-
.observer(node_id, type: BlockObserver, params: {}, &block) ⇒ Object
Define a node that observes rows.
-
.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) ⇒ Object
Define an outer join node where rows from the source are kept even if an associated entity is not found in the hash for the join key.
-
.sink(type:, source:, params:) ⇒ Object
Define the sink for this transformation.
-
.source(node_id, type:, params:) ⇒ Object
A source that has row semantics, delivering a hash per row (or per entity) for the source.
-
.transform(node_id, type: BlockTransform, params: {}, &block) ⇒ Object
Define a node that transforms rows.
-
.unfold(node_id, type: BlockUnfold, params: {}, &block) ⇒ Object
Define a node that unfolds rows.
Class Method Details
.filter(node_id, type: BlockFilter, params: {}, &block) ⇒ Object
Define a node that filters rows. By default you get a BlockFilter (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block determines if a given row flows to a downstream node
90 91 92 93 94 95 96 |
# File 'lib/arc-furnace/pipeline.rb', line 90 def self.filter(node_id, type: BlockFilter, params: {}, &block) if block_given? && type <= BlockFilter params[:block] = block end raise "Filter #{type} is not a Filter!" unless type <= Filter define_intermediate(node_id, type: type, params: params) end |
.hash_node(node_id, type: ArcFurnace::Hash, params:) ⇒ Object
Define a hash node, processing all rows from it’s source and caching them in-memory.
33 34 35 |
# File 'lib/arc-furnace/pipeline.rb', line 33 def self.hash_node(node_id, type: ArcFurnace::Hash, params:) define_intermediate(node_id, type: type, params: params) end |
.inherited(subclass) ⇒ Object
Ensure that subclasses don’t overwrite the parent’s transform node definitions
13 14 15 |
# File 'lib/arc-furnace/pipeline.rb', line 13 def self.inherited(subclass) subclass.intermediates_map = intermediates_map.dup end |
.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) ⇒ Object
Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key
46 47 48 |
# File 'lib/arc-furnace/pipeline.rb', line 46 def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) define_intermediate(node_id, type: type, params: params) end |
.instance(params = {}) ⇒ Object
Create an instance to run a transformation, passing the parameters to instantiate the transform instance with. The resulting class instance will have a single public method–#execute, which will perform the transformation.
114 115 116 117 |
# File 'lib/arc-furnace/pipeline.rb', line 114 def self.instance(params = {}) @params = params PipelineInstance.new(self, params) end |
.merge(node_id, type: ArcFurnace::Merge, params:) ⇒ Object
Define a merge node where rows from multiple source nodes are merged into a single row
70 71 72 |
# File 'lib/arc-furnace/pipeline.rb', line 70 def self.merge(node_id, type: ArcFurnace::Merge, params:) define_intermediate(node_id, type: type, params: params) end |
.observer(node_id, type: BlockObserver, params: {}, &block) ⇒ Object
Define a node that observes rows. By default you get a BlockObserver (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block is ignored; all rows are forwarded to the next node in the line
102 103 104 105 106 107 108 |
# File 'lib/arc-furnace/pipeline.rb', line 102 def self.observer(node_id, type: BlockObserver, params: {}, &block) if block_given? && type <= BlockObserver params[:block] = block end raise "Observer #{type} is not an Observer!" unless type <= Observer define_intermediate(node_id, type: type, params: params) end |
.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) ⇒ Object
Define an outer join node where rows from the source are kept even if an associated entity is not found in the hash for the join key
52 53 54 |
# File 'lib/arc-furnace/pipeline.rb', line 52 def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) define_intermediate(node_id, type: type, params: params) end |
.sink(type:, source:, params:) ⇒ Object
Define the sink for this transformation. Only a single sink may be specified per transformation. The sink is delivered a hash per row or entity, and feeds them from the graph of nodes above it.
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/arc-furnace/pipeline.rb', line 20 def self.sink(type:, source:, params:) if sink_node raise 'Sink already defined!' end @sink_node = -> do type.new(resolve_parameters(:sink, params)) end @sink_source = source end |
.source(node_id, type:, params:) ⇒ Object
A source that has row semantics, delivering a hash per row (or per entity) for the source.
39 40 41 42 |
# File 'lib/arc-furnace/pipeline.rb', line 39 def self.source(node_id, type:, params:) raise "Source #{type} is not a Source!" unless type <= Source define_intermediate(node_id, type: type, params: params) end |
.transform(node_id, type: BlockTransform, params: {}, &block) ⇒ Object
Define a node that transforms rows. By default you get a BlockTransform (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the row for the next downstream node.
60 61 62 63 64 65 66 |
# File 'lib/arc-furnace/pipeline.rb', line 60 def self.transform(node_id, type: BlockTransform, params: {}, &block) if block_given? && type <= BlockTransform params[:block] = block end raise "Transform #{type} is not a Transform!" unless type <= Transform define_intermediate(node_id, type: type, params: params) end |
.unfold(node_id, type: BlockUnfold, params: {}, &block) ⇒ Object
Define a node that unfolds rows. By default you get a BlockUnfold (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the set of rows for the next downstream node.
78 79 80 81 82 83 84 |
# File 'lib/arc-furnace/pipeline.rb', line 78 def self.unfold(node_id, type: BlockUnfold, params: {}, &block) if block_given? && type <= BlockUnfold params[:block] = block end raise "Unfold #{type} is not an Unfold!" unless type <= Unfold define_intermediate(node_id, type: type, params: params) end |