Class: Ath::Driver
- Inherits:
-
Object
- Object
- Ath::Driver
- Defined in:
- lib/ath/driver.rb
Instance Attribute Summary collapse
-
#database ⇒ Object
Returns the value of attribute database.
Instance Method Summary collapse
- #download_query_execution_result(bucket:, key:, file:) ⇒ Object
- #get_query_execution(query_execution_id:) ⇒ Object
- #get_query_execution_result(query_execution_id:) ⇒ Object
- #get_query_execution_result_output_location(query_execution_id:) ⇒ Object
-
#initialize(athena:, s3:, output_location:, database:, options: {}) ⇒ Driver
constructor
A new instance of Driver.
- #list_query_executions ⇒ Object
- #output_location ⇒ Object
- #output_location=(v) ⇒ Object
- #region ⇒ Object
- #region=(v) ⇒ Object
- #save_query_execution_result(query_execution_id:, path:) ⇒ Object
- #start_query_execution(query_string:) ⇒ Object
- #stop_query_execution(query_execution_id:) ⇒ Object
Constructor Details
#initialize(athena:, s3:, output_location:, database:, options: {}) ⇒ Driver
Returns a new instance of Driver.
4 5 6 7 8 9 10 |
# File 'lib/ath/driver.rb', line 4 def initialize(athena:, s3:, output_location:, database:, options: {}) @athena = athena @s3 = s3 @output_location = output_location @database = database @options = end |
Instance Attribute Details
#database ⇒ Object
Returns the value of attribute database.
2 3 4 |
# File 'lib/ath/driver.rb', line 2 def database @database end |
Instance Method Details
#download_query_execution_result(bucket:, key:, file:) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/ath/driver.rb', line 41 def download_query_execution_result(bucket:, key:, file:) head = @s3.head_object(bucket: bucket, key: key) if @options[:progress] and head.content_length >= 1024 ** 2 = ProgressBar.create(title: 'Download', total: head.content_length, output: $stderr) begin @s3.get_object(bucket: bucket, key: key) do |chunk| file.write(chunk) begin .progress += chunk.length rescue ProgressBar::InvalidProgressError # nothing to do end end ensure .clear end else @s3.get_object(bucket: bucket, key: key) do |chunk| file.write(chunk) end end file.flush file end |
#get_query_execution(query_execution_id:) ⇒ Object
12 13 14 |
# File 'lib/ath/driver.rb', line 12 def get_query_execution(query_execution_id:) @athena.get_query_execution(query_execution_id: query_execution_id).query_execution end |
#get_query_execution_result(query_execution_id:) ⇒ Object
21 22 23 24 25 |
# File 'lib/ath/driver.rb', line 21 def get_query_execution_result(query_execution_id:) tmp = Tempfile.create('ath') bucket, key = get_query_execution_result_output_location(query_execution_id: query_execution_id) download_query_execution_result(bucket: bucket, key: key, file: tmp) end |
#get_query_execution_result_output_location(query_execution_id:) ⇒ Object
70 71 72 73 74 |
# File 'lib/ath/driver.rb', line 70 def get_query_execution_result_output_location(query_execution_id:) query_execution = @athena.get_query_execution(query_execution_id: query_execution_id).query_execution output_location = query_execution.result_configuration.output_location output_location.sub(%r{\As3://}, '').split('/', 2) end |
#list_query_executions ⇒ Object
16 17 18 19 |
# File 'lib/ath/driver.rb', line 16 def list_query_executions query_execution_ids = @athena.list_query_executions.each_page.flat_map(&:query_execution_ids) @athena.batch_get_query_execution(query_execution_ids: query_execution_ids.slice(0, 50)).query_executions end |
#output_location ⇒ Object
87 88 89 |
# File 'lib/ath/driver.rb', line 87 def output_location @output_location end |
#output_location=(v) ⇒ Object
91 92 93 |
# File 'lib/ath/driver.rb', line 91 def output_location=(v) @output_location = v end |
#region ⇒ Object
95 96 97 |
# File 'lib/ath/driver.rb', line 95 def region @athena.config.region end |
#region=(v) ⇒ Object
99 100 101 102 103 |
# File 'lib/ath/driver.rb', line 99 def region=(v) @athena.config.region = v @athena.config.sigv4_region = v @athena.config.endpoint = Aws::EndpointProvider.resolve(v, 'athena') end |
#save_query_execution_result(query_execution_id:, path:) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/ath/driver.rb', line 27 def save_query_execution_result(query_execution_id:, path:) bucket, key = get_query_execution_result_output_location(query_execution_id: query_execution_id) if File.directory?(path) path = File.join(path, File.basename(key)) end open(path, 'wb') do |file| download_query_execution_result(bucket: bucket, key: key, file: file) end path end |
#start_query_execution(query_string:) ⇒ Object
76 77 78 79 80 81 |
# File 'lib/ath/driver.rb', line 76 def start_query_execution(query_string:) @athena.start_query_execution( query_string: query_string, query_execution_context: {database: @database}, result_configuration: {output_location: @output_location}) end |
#stop_query_execution(query_execution_id:) ⇒ Object
83 84 85 |
# File 'lib/ath/driver.rb', line 83 def stop_query_execution(query_execution_id:) @athena.stop_query_execution(query_execution_id: query_execution_id) end |