Class: Pyper::Pipes::Cassandra::ModKeyReader

Inherits:
Object
  • Object
show all
Defined in:
lib/pyper/pipes/cassandra/mod_key_reader.rb

Overview

This pipe is for reading data from sharded rows in Cassandra. The table must have rows sharded by the ‘mod_key’ field. For a fixed number of such shards, this pipe reads all data from all of those shards, returning a lazy enumerator over all of those rows. For example, if mod_size is 100, it will read the 100 rows with mod_key between 0 and 99.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(table, client, mod_size = 100, page_size = 1000) ⇒ ModKeyReader

Returns a new instance of ModKeyReader.



14
15
16
17
18
19
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 14

def initialize(table, client, mod_size = 100, page_size = 1000)
  @table = table
  @client = client
  @mod_size = mod_size
  @page_size = page_size
end

Instance Attribute Details

#clientObject (readonly)

Parameters:

  • table (Symbol)

    the name of the cassandra table to fetch data from

  • client (Cassava::Client)
  • mod_size (Integer)

    the mod size

  • page_size (Integer)

    the page size



13
14
15
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13

def client
  @client
end

#mod_sizeObject (readonly)

Parameters:

  • table (Symbol)

    the name of the cassandra table to fetch data from

  • client (Cassava::Client)
  • mod_size (Integer)

    the mod size

  • page_size (Integer)

    the page size



13
14
15
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13

def mod_size
  @mod_size
end

#page_sizeObject (readonly)

Parameters:

  • table (Symbol)

    the name of the cassandra table to fetch data from

  • client (Cassava::Client)
  • mod_size (Integer)

    the mod size

  • page_size (Integer)

    the page size



13
14
15
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13

def page_size
  @page_size
end

#tableObject (readonly)

Parameters:

  • table (Symbol)

    the name of the cassandra table to fetch data from

  • client (Cassava::Client)
  • mod_size (Integer)

    the mod size

  • page_size (Integer)

    the page size



13
14
15
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 13

def table
  @table
end

Instance Method Details

#pipe(arguments, status = {}) ⇒ Enumerator::Lazy<Hash>

Returns enumerator of items from all rows.

Parameters:

  • arguments (Hash)

    Arguments passed to the cassandra client where statement

  • status (Hash) (defaults to: {})

    The mutable status field

Returns:

  • (Enumerator::Lazy<Hash>)

    enumerator of items from all rows



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pyper/pipes/cassandra/mod_key_reader.rb', line 24

def pipe(arguments, status = {})
  (Enumerator.new do |yielder|
     (0...mod_size).each do |mod_id|
       options = { :page_size => page_size }
       paging_state = nil
       loop do
         options[:paging_state] = paging_state if paging_state.present?
         result = client.select(table).where(arguments.merge(:mod_key => mod_id)).execute(options)
         result.each { |item| yielder << item }

         break if result.last_page?
         paging_state = result.paging_state
       end
     end
   end).lazy
end