Class: Pyper::Pipes::Cassandra::ModKeyReader
- Inherits:
-
Object
- Object
- Pyper::Pipes::Cassandra::ModKeyReader
- Defined in:
- lib/pyper/pipes/cassandra/mod_key_reader.rb
Overview
This pipe is for reading data from sharded rows in Cassandra. The table must have rows sharded by the ‘mod_key’ field. For a fixed number of such shards, this pipe reads all data from all of those shards, returning a lazy enumerator over all of those rows. For example, if mod_size is 100, it will read the 100 rows with mod_key between 0 and 99.
Instance Attribute Summary collapse
- #client ⇒ Object readonly
- #mod_size ⇒ Object readonly
- #page_size ⇒ Object readonly
- #table ⇒ Object readonly
Instance Method Summary collapse
-
#initialize(table, client, mod_size = 100, page_size = 1000) ⇒ ModKeyReader
constructor
A new instance of ModKeyReader.
-
#pipe(arguments, status = {}) ⇒ Enumerator::Lazy<Hash>
Enumerator of items from all rows.
Constructor Details
#initialize(table, client, mod_size = 100, page_size = 1000) ⇒ ModKeyReader
Returns a new instance of ModKeyReader.
14 15 16 17 18 19 |
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 14 def initialize(table, client, mod_size = 100, page_size = 1000) @table = table @client = client @mod_size = mod_size @page_size = page_size end |
Instance Attribute Details
#client ⇒ Object (readonly)
13 14 15 |
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13 def client @client end |
#mod_size ⇒ Object (readonly)
13 14 15 |
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13 def mod_size @mod_size end |
#page_size ⇒ Object (readonly)
13 14 15 |
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13 def page_size @page_size end |
#table ⇒ Object (readonly)
13 14 15 |
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13 def table @table end |
Instance Method Details
#pipe(arguments, status = {}) ⇒ Enumerator::Lazy<Hash>
Returns enumerator of items from all rows.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 24 def pipe(arguments, status = {}) (Enumerator.new do |yielder| (0...mod_size).each do |mod_id| = { :page_size => page_size } paging_state = nil loop do [:paging_state] = paging_state if paging_state.present? result = client.select(table).where(arguments.merge(:mod_key => mod_id)).execute() result.each { |item| yielder << item } break if result.last_page? paging_state = result.paging_state end end end).lazy end |