Class: CouchProxy::Reduce::BaseReducer

Inherits:
Object
  • Object
show all
Defined in:
lib/couchproxy/reduce/base_reducer.rb

Overview

Sorts and merges results from many different source streams as the data arrives from CouchDB over the network. This uses constant memory space to do the merge so we can handle huge datasets streaming back from the databases. Subclasses must provide a @sorter member variable, used to sort streaming rows before they’re processed.

Direct Known Subclasses

MapReducer, ReduceReducer

Constant Summary collapse

KEY =
'key'.freeze
ID =
'id'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ BaseReducer

Args should contain the following keys:

sources: List of stream sources used to identify from where
         streaming rows are arriving.
  limit: Maximum number of rows to return. If not specified, all
         rows are returned.
   skip: Number of rows at the start of the stream to skip before
         returning the rest. If not specified, no rows are skipped.


22
23
24
25
26
27
28
29
# File 'lib/couchproxy/reduce/base_reducer.rb', line 22

def initialize(args)
  @sources, @limit, @skip = args.values_at(:sources, :limit, :skip)
  @sources = Hash[@sources.map {|s| [s, 0] }]
  @listeners = Hash.new {|h, k| h[k] = [] }
  @skip ||= 0
  @returned, @skipped_rows = 0, 0
  @rows = MultiRBTree.new.tap {|t| t.readjust(@sorter) }
end

Instance Method Details

#complete?Boolean

Returns true if all streams of rows have arrived and the reduce processing is complete.

Returns:

  • (Boolean)


71
72
73
# File 'lib/couchproxy/reduce/base_reducer.rb', line 71

def complete?
  @sources.empty?
end

#reduce(rows, source, complete) ⇒ Object

Gives the reducer more rows to process with their source connection. Complete must be a boolean, signaling whether this stream of rows has finished.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/couchproxy/reduce/base_reducer.rb', line 47

def reduce(rows, source, complete)
  return if complete?
  rows.each do |row|
    row[:proxy_source] = source
    key = [row[KEY], row[ID]]
    @rows[key] = row
  end
  @sources[source] += rows.size
  @sources.delete(source) if complete
  source.pause unless complete
  process do |results|
    if results
      results = limit(skip(results))
      notify_results(results) if results.any?
      notify_complete if complete?
      resume_streams unless complete?
    else
      notify_error
    end
  end if process?
end