Class: CouchProxy::Reduce::BaseReducer
- Inherits:
-
Object
- Object
- CouchProxy::Reduce::BaseReducer
- Defined in:
- lib/couchproxy/reduce/base_reducer.rb
Overview
Sorts and merges results from many different source streams as the data arrives from CouchDB over the network. This uses constant memory space to do the merge so we can handle huge datasets streaming back from the databases. Subclasses must provide a @sorter member variable, used to sort streaming rows before they’re processed.
Direct Known Subclasses
Constant Summary collapse
- KEY =
'key'.freeze
- ID =
'id'.freeze
Instance Method Summary collapse
-
#complete? ⇒ Boolean
Returns true if all streams of rows have arrived and the reduce processing is complete.
-
#initialize(args) ⇒ BaseReducer
constructor
Args should contain the following keys: sources: List of stream sources used to identify from where streaming rows are arriving.
-
#reduce(rows, source, complete) ⇒ Object
Gives the reducer more rows to process with their source connection.
Constructor Details
#initialize(args) ⇒ BaseReducer
Args should contain the following keys:
sources: List of stream sources used to identify from where
streaming rows are arriving.
limit: Maximum number of rows to return. If not specified, all
rows are returned.
skip: Number of rows at the start of the stream to skip before
returning the rest. If not specified, no rows are skipped.
22 23 24 25 26 27 28 29 |
# File 'lib/couchproxy/reduce/base_reducer.rb', line 22 def initialize(args) @sources, @limit, @skip = args.values_at(:sources, :limit, :skip) @sources = Hash[@sources.map {|s| [s, 0] }] @listeners = Hash.new {|h, k| h[k] = [] } @skip ||= 0 @returned, @skipped_rows = 0, 0 @rows = MultiRBTree.new.tap {|t| t.readjust(@sorter) } end |
Instance Method Details
#complete? ⇒ Boolean
Returns true if all streams of rows have arrived and the reduce processing is complete.
71 72 73 |
# File 'lib/couchproxy/reduce/base_reducer.rb', line 71 def complete? @sources.empty? end |
#reduce(rows, source, complete) ⇒ Object
Gives the reducer more rows to process with their source connection. Complete must be a boolean, signaling whether this stream of rows has finished.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/couchproxy/reduce/base_reducer.rb', line 47 def reduce(rows, source, complete) return if complete? rows.each do |row| row[:proxy_source] = source key = [row[KEY], row[ID]] @rows[key] = row end @sources[source] += rows.size @sources.delete(source) if complete source.pause unless complete process do |results| if results results = limit(skip(results)) notify_results(results) if results.any? notify_complete if complete? resume_streams unless complete? else notify_error end end if process? end |