Class: Pyper::Pipes::Cassandra::AllItemsReader

Inherits:
Struct
  • Object
show all
Defined in:
lib/pyper/pipes/cassandra/all_items_reader.rb

Overview

A pipe for reading all items from a single row in cassandra

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(table, client, page_size = 1000) ⇒ AllItemsReader

Returns a new instance of AllItemsReader.

Parameters:

  • table (Symbol)

    the name of the cassandra table to fetch data from

  • client (Cassava::Client)
  • mod_size (Integer)

    the mod size

  • page_size (Integer) (defaults to: 1000)

    the page size



12
13
14
15
16
# File 'lib/pyper/pipes/cassandra/all_items_reader.rb', line 12

def initialize(table, client, page_size = 1000)
  @table = table
  @client = client
  @page_size = page_size
end

Instance Attribute Details

#clientObject

Returns the value of attribute client

Returns:

  • (Object)

    the current value of client



5
6
7
# File 'lib/pyper/pipes/cassandra/all_items_reader.rb', line 5

def client
  @client
end

#page_sizeObject (readonly)

Returns the value of attribute page_size.



6
7
8
# File 'lib/pyper/pipes/cassandra/all_items_reader.rb', line 6

def page_size
  @page_size
end

#tableObject

Returns the value of attribute table

Returns:

  • (Object)

    the current value of table



5
6
7
# File 'lib/pyper/pipes/cassandra/all_items_reader.rb', line 5

def table
  @table
end

Instance Method Details

#pipe(arguments, status = {}) ⇒ Enumerator::Lazy<Hash>

Returns enumerator of items.

Parameters:

  • arguments (Hash)

    Arguments passed to the cassandra client where statement

  • status (Hash) (defaults to: {})

    The mutable status field

Options Hash (arguments):

  • :order (Array)

    A pair [clustering_column, :desc|:asc] determining how to order the results.

  • :page_size (Integer)

Returns:

  • (Enumerator::Lazy<Hash>)

    enumerator of items



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pyper/pipes/cassandra/all_items_reader.rb', line 23

def pipe(arguments, status = {})
  columns = arguments.delete(:columns)
  enum = Enumerator.new do |yielder|
    options = { :page_size => page_size }
    paging_state = nil
    loop do
      options[:paging_state] = paging_state if paging_state.present?
      result = @client.select(@table, columns).where(arguments).execute(options)
      result.each { |item| yielder << item }

      break if result.last_page?
      paging_state = result.paging_state
    end
  end
  enum.lazy
end