Class: Ath::Driver

Inherits:
Object
  • Object
show all
Defined in:
lib/ath/driver.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(athena:, s3:, output_location:, database:, options: {}) ⇒ Driver

Returns a new instance of Driver.



4
5
6
7
8
9
10
# File 'lib/ath/driver.rb', line 4

def initialize(athena:, s3:, output_location:, database:, options: {})
  @athena = athena
  @s3 = s3
  @output_location = output_location
  @database = database
  @options = options
end

Instance Attribute Details

#databaseObject

Returns the value of attribute database.



2
3
4
# File 'lib/ath/driver.rb', line 2

def database
  @database
end

Instance Method Details

#download_query_execution_result(bucket:, key:, file:) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/ath/driver.rb', line 41

def download_query_execution_result(bucket:, key:, file:)
  head = @s3.head_object(bucket: bucket, key: key)

  if @options[:progress] and head.content_length >= 1024 ** 2
    download_progressbar = ProgressBar.create(title: 'Download', total: head.content_length, output: $stderr)

    begin
      @s3.get_object(bucket: bucket, key: key) do |chunk|
        file.write(chunk)

        begin
          download_progressbar.progress += chunk.length
        rescue ProgressBar::InvalidProgressError
          # nothing to do
        end
      end
    ensure
      download_progressbar.clear
    end
  else
    @s3.get_object(bucket: bucket, key: key) do |chunk|
      file.write(chunk)
    end
  end

  file.flush
  file
end

#get_query_execution(query_execution_id:) ⇒ Object



12
13
14
# File 'lib/ath/driver.rb', line 12

def get_query_execution(query_execution_id:)
  @athena.get_query_execution(query_execution_id: query_execution_id).query_execution
end

#get_query_execution_result(query_execution_id:) ⇒ Object



21
22
23
24
25
# File 'lib/ath/driver.rb', line 21

def get_query_execution_result(query_execution_id:)
  tmp = Tempfile.create('ath')
  bucket, key = get_query_execution_result_output_location(query_execution_id: query_execution_id)
  download_query_execution_result(bucket: bucket, key: key, file: tmp)
end

#get_query_execution_result_output_location(query_execution_id:) ⇒ Object



70
71
72
73
74
# File 'lib/ath/driver.rb', line 70

def get_query_execution_result_output_location(query_execution_id:)
  query_execution = @athena.get_query_execution(query_execution_id: query_execution_id).query_execution
  output_location = query_execution.result_configuration.output_location
  output_location.sub(%r{\As3://}, '').split('/', 2)
end

#list_query_executionsObject



16
17
18
19
# File 'lib/ath/driver.rb', line 16

def list_query_executions
  query_execution_ids = @athena.list_query_executions.each_page.flat_map(&:query_execution_ids)
  @athena.batch_get_query_execution(query_execution_ids: query_execution_ids.slice(0, 50)).query_executions
end

#output_locationObject



87
88
89
# File 'lib/ath/driver.rb', line 87

def output_location
  @output_location
end

#output_location=(v) ⇒ Object



91
92
93
# File 'lib/ath/driver.rb', line 91

def output_location=(v)
  @output_location = v
end

#regionObject



95
96
97
# File 'lib/ath/driver.rb', line 95

def region
  @athena.config.region
end

#region=(v) ⇒ Object



99
100
101
102
103
# File 'lib/ath/driver.rb', line 99

def region=(v)
  @athena.config.region = v
  @athena.config.sigv4_region = v
  @athena.config.endpoint = Aws::EndpointProvider.resolve(v, 'athena')
end

#save_query_execution_result(query_execution_id:, path:) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/ath/driver.rb', line 27

def save_query_execution_result(query_execution_id:, path:)
  bucket, key = get_query_execution_result_output_location(query_execution_id: query_execution_id)

  if File.directory?(path)
    path = File.join(path, File.basename(key))
  end

  open(path, 'wb') do |file|
    download_query_execution_result(bucket: bucket, key: key, file: file)
  end

  path
end

#start_query_execution(query_string:) ⇒ Object



76
77
78
79
80
81
# File 'lib/ath/driver.rb', line 76

def start_query_execution(query_string:)
  @athena.start_query_execution(
    query_string: query_string,
    query_execution_context: {database: @database},
    result_configuration: {output_location: @output_location})
end

#stop_query_execution(query_execution_id:) ⇒ Object



83
84
85
# File 'lib/ath/driver.rb', line 83

def stop_query_execution(query_execution_id:)
  @athena.stop_query_execution(query_execution_id: query_execution_id)
end