Class: Athens::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/athens/query.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, query_execution_id) ⇒ Query

Returns a new instance of Query.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/athens/query.rb', line 5

def initialize(connection, query_execution_id)
  @connection = connection
  @query_execution_id = query_execution_id
  @state = nil
  @state_reason = nil
  @cancelled = false

  @results = nil
  @hash_results = nil

  version = RUBY_VERSION.split('.').map {|v| v.to_i}
  @decimal_without_new = (version[0] >= 2 && version[1] >= 5)
  @decimal_without_new = (version[0] == 2 && version[1] >= 5) || (version[0] >= 3)
end

Instance Attribute Details

#query_execution_idObject (readonly)

Returns the value of attribute query_execution_id.



3
4
5
# File 'lib/athens/query.rb', line 3

def query_execution_id
  @query_execution_id
end

Instance Method Details

#cancelObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/athens/query.rb', line 57

def cancel
  unless @cancelled
    resp = @connection.client.stop_query_execution({
      query_execution_id: @query_execution_id
    })
    @cancelled = true
    refresh_state
  end

  if @state == 'CANCELLED'
    return true
  else
    return false
  end
end

#recordsObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/athens/query.rb', line 103

def records
  Enumerator.new do |y|
    headers = nil

    rows.each_with_index do |row|
      if headers.nil?
        headers = row
        next
      end

      y << Hash[headers.zip(row)]
    end
  end
end

#rowsObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/athens/query.rb', line 73

def rows
  raise InvalidRequestError.new("Query must be in SUCCEEDED state to return results") unless @state == 'SUCCEEDED'

  Enumerator.new do |y|
    result = @connection.client.get_query_results({query_execution_id: @query_execution_id})

     = result.result_set.
    first = true

    while true
      rows = result.result_set.rows
      break if rows.empty?

      if first
        y << rows.shift.data.map {|col| col.var_char_value}
        first = false
      end

      rows.each {|row| y << map_types(, row)}

      break unless result.next_token

      result = @connection.client.get_query_results({
        query_execution_id: @query_execution_id,
        next_token: result.next_token
      })
    end
  end
end

#stateObject



20
21
22
23
# File 'lib/athens/query.rb', line 20

def state
  refresh_state if state_needs_refresh?
  @state
end

#state_reasonObject



25
26
27
28
# File 'lib/athens/query.rb', line 25

def state_reason
  refresh_state if state_needs_refresh?
  @state_reason
end

#to_a(header_row: true) ⇒ Object



118
119
120
# File 'lib/athens/query.rb', line 118

def to_a(header_row: true)
  (@results ||= rows.to_a).drop(header_row ? 0 : 1)
end

#to_hObject



122
123
124
# File 'lib/athens/query.rb', line 122

def to_h
  @hash_results ||= records.to_a
end

#wait(max_seconds = nil) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/athens/query.rb', line 30

def wait(max_seconds = nil)
  if max_seconds.nil?
    stop_at = nil
  else
    stop_at = Time.now + max_seconds
  end

  while true
    if stop_at != nil && Time.now > stop_at
      return false
    end

    refresh_state

    if @state == 'SUCCEEDED'
      return true
    elsif @state == 'FAILED'
      raise QueryFailedError.new(@state_reason)
    elsif state == 'CANCELLED'
      raise QueryCancelledError.new(@state_reason)
    end

    # Wait a bit and check again
    sleep(Athens.configuration.wait_polling_period.to_f)
  end
end