Class: Wukong::Streamer::UniqByLastReducer

Inherits:
AccumulatingReducer show all
Defined in:
lib/wukong/streamer/uniq_by_last_reducer.rb

Overview

UniqByLastReducer accepts all records for a given key and emits only the last-seen.

It acts like an insecure high-school kid: for each record of a given key it discards whatever record it's holding and adopts this new value. When a new key comes on the scene it emits the last record, like an older brother handing off his Depeche Mode collection.

For example, to extract the latest value for each property, emit your records as

[resource_type, key, timestamp, ... fields ...]

then set :sort_fields to 3 and :partition_fields to 2.

Instance Attribute Summary (collapse)

Attributes inherited from AccumulatingReducer

#key

Attributes inherited from Base

#own_options

Instance Method Summary (collapse)

Methods inherited from AccumulatingReducer

#after_stream, #before_stream, #process

Methods inherited from Base

#after_stream, #bad_record!, #before_stream, #each_record, #emit, #initialize, #mapper, mapper, #monitor, #options, #process, #recordize, #run, run, #stream, #track

Constructor Details

This class inherits a constructor from Wukong::Streamer::Base

Instance Attribute Details

- (Object) final_value

Returns the value of attribute final_value



20
21
22
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 20

def final_value
  @final_value
end

Instance Method Details

- (Object) accumulate(*vals)

Adopt each value in turn: the last one's the one you want.



32
33
34
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 32

def accumulate *vals
  self.final_value = vals
end

- (Object) finalize {|final_value| ... }

Emit the last-seen value

Yields:

  • (final_value)


39
40
41
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 39

def finalize
  yield final_value if final_value
end

- (Object) get_key(*vals)

Use first two fields as keys by default



25
26
27
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 25

def get_key *vals
  vals[0..1]
end

- (Object) start!(*args)

Clear state on reset



46
47
48
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 46

def start! *args
  self.final_value = nil
end