Class: Quandl::Cassandra::Models::Column::Read::SelectColumns

Inherits:
Quandl::Cassandra::Models::Column::Read show all
Defined in:
lib/quandl/cassandra/models/column/read/select_columns.rb

Instance Method Summary collapse

Methods inherited from Quandl::Cassandra::Models::Column::Read

#column_collapses, #column_ids, #column_ids=, #count?, perform

Instance Method Details

#count_dataObject



11
12
13
14
15
16
17
18
19
20
21
# File 'lib/quandl/cassandra/models/column/read/select_columns.rb', line 11

def count_data
  prepared = Quandl::Cassandra::Base.prepare( statement )
  rows = []
  column_ids.each_with_index do | id, index |
    # pluck column type from collapses
    type = attributes[:column_collapses][index].to_s
    # bind and execute query
    rows << prepared.execute( id, type, :one )
  end
  rows.collect{|r| r.first['count'] }.max
end

#orderObject



59
60
61
# File 'lib/quandl/cassandra/models/column/read/select_columns.rb', line 59

def order
  @order ||= attributes[:order] == :asc ? :asc : :desc
end

#performObject



3
4
5
6
7
8
9
# File 'lib/quandl/cassandra/models/column/read/select_columns.rb', line 3

def perform
  # attrs to result hash
  attributes[:data] = count? ? count_data : select_data
rescue => e
  Quandl::Logger.error("#{attributes} #{e}")
  raise
end

#select_dataObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/quandl/cassandra/models/column/read/select_columns.rb', line 23

def select_data
  t1 = Time.now
  # fire off the queries
  prepared = Quandl::Cassandra::Base.prepare( statement )
  data = {}
  futures = []
  column_ids.each_with_index do | id, index |
    # pluck column type from collapses
    type = attributes[:column_collapses][index].to_s
    # bind and execute query
    futures << prepared.async.execute( id, type, Quandl::Cassandra::Base.consistency )
  end
  # collect the results
  futures.each_with_index do |future, index|
    # collect result 
    future.value.each do |row|
      data[row['time']] ||= Array.new( column_ids.count )
      data[row['time']][index] ||= row['value']
    end
  end
  Quandl::Logger.debug("(#{t1.elapsed_ms}) #{self.class.name}.select_data")
  data
end

#statementObject



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/quandl/cassandra/models/column/read/select_columns.rb', line 47

def statement
  columns = count? ? "COUNT(*)" : "time,value"
  cql = "SELECT #{columns} FROM columns WHERE"
  # cql += "ORDER"
  cql += " time >= #{trim_start.jd} AND " if trim_start?
  cql += " time <= #{trim_end.jd} AND " if trim_end?
  cql += " id = ? AND type = ?"
  cql += " ORDER BY type #{order}"
  cql += " LIMIT #{attributes[:limit]}" if attributes[:limit]
  cql
end