Class: PiecePipe::HashedAggregator

Inherits:
Step
  • Object
show all
Defined in:
lib/piece_pipe/hashed_aggregator.rb

Overview

This Step acts like a sink: it will process ALL available inputs until the stream ends, and THEN call #aggregrate multiple times to transform accumulated data and produce output.

For every key-value pair that arrives as an input (see MapStep for generating key-value pairs), an array of values is accumulated per key.

When all inputs have been collected, #aggregate is called for each key that has been seen to-date.

Your job is to implement #aggregate, perform some calculation on the key and its accumulated values, and generate output via zero or more calls to #produce.

Instance Attribute Summary

Attributes inherited from Step

#source

Instance Method Summary collapse

Methods inherited from Step

#to_enum

Constructor Details

#initializeHashedAggregator

Returns a new instance of HashedAggregator.



15
16
17
# File 'lib/piece_pipe/hashed_aggregator.rb', line 15

def initialize
  @hash = {}
end

Instance Method Details

#aggregate(key, values) ⇒ Object

Accepts a key and an array of all values that have been collected for that key. Default impl simply produces { :key => key, :value => values } (essentially a noop).



36
37
38
# File 'lib/piece_pipe/hashed_aggregator.rb', line 36

def aggregate(key, values)
  produce key: key, values: values 
end

#generate_sequenceObject



19
20
21
22
23
24
# File 'lib/piece_pipe/hashed_aggregator.rb', line 19

def generate_sequence
  super
  @hash.each do |key,values|
    aggregate key, values
  end
end

#process(item) ⇒ Object



26
27
28
29
30
31
32
# File 'lib/piece_pipe/hashed_aggregator.rb', line 26

def process(item)
  raise "HashedAggregator requires inputs to be Hashes with :key and :value" unless item.keys.include?(:key) and item.keys.include?(:value)
  # TODO : check key/val keys in item
  #
  @hash[item[:key]] ||= []
  @hash[item[:key]] << item[:value]
end