Class: PiecePipe::HashedAggregator
- Defined in:
- lib/piece_pipe/hashed_aggregator.rb
Overview
This Step acts like a sink: it will process ALL available inputs until the stream ends, and THEN call #aggregrate multiple times to transform accumulated data and produce output.
For every key-value pair that arrives as an input (see MapStep for generating key-value pairs), an array of values is accumulated per key.
When all inputs have been collected, #aggregate is called for each key that has been seen to-date.
Your job is to implement #aggregate, perform some calculation on the key and its accumulated values, and generate output via zero or more calls to #produce.
Instance Attribute Summary
Attributes inherited from Step
Instance Method Summary collapse
-
#aggregate(key, values) ⇒ Object
Accepts a key and an array of all values that have been collected for that key.
- #generate_sequence ⇒ Object
-
#initialize ⇒ HashedAggregator
constructor
A new instance of HashedAggregator.
- #process(item) ⇒ Object
Methods inherited from Step
Constructor Details
#initialize ⇒ HashedAggregator
Returns a new instance of HashedAggregator.
15 16 17 |
# File 'lib/piece_pipe/hashed_aggregator.rb', line 15 def initialize @hash = {} end |
Instance Method Details
#aggregate(key, values) ⇒ Object
Accepts a key and an array of all values that have been collected for that key. Default impl simply produces { :key => key, :value => values } (essentially a noop).
36 37 38 |
# File 'lib/piece_pipe/hashed_aggregator.rb', line 36 def aggregate(key, values) produce key: key, values: values end |
#generate_sequence ⇒ Object
19 20 21 22 23 24 |
# File 'lib/piece_pipe/hashed_aggregator.rb', line 19 def generate_sequence super @hash.each do |key,values| aggregate key, values end end |
#process(item) ⇒ Object
26 27 28 29 30 31 32 |
# File 'lib/piece_pipe/hashed_aggregator.rb', line 26 def process(item) raise "HashedAggregator requires inputs to be Hashes with :key and :value" unless item.keys.include?(:key) and item.keys.include?(:value) # TODO : check key/val keys in item # @hash[item[:key]] ||= [] @hash[item[:key]] << item[:value] end |