Class: Egis::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/egis/client.rb

Overview

The most fundamental Egis class. Provides an interface for executing Athena queries.

See configuration instructions configure.

Examples:

Create client and execute asynchronous query

client = Egis::Client.new
status = client.execute_query('SELECT * FROM my_table;')

while status.in_progress?
  # do something useful
  # ...
  status = client.query_status(status.id)
end

status.output_location.url # s3://my-bucket/result/path

Execute synchronous query and fetch results

status = client.execute_query('SELECT MAX(time), MIN(id) FROM my_table;', async: false)
status.fetch_result(schema: [:timestamp, :int]) # [[2020-05-04 11:19:03 +0200, 7]]

See Also:

Instance Method Summary collapse

Constructor Details

#initialize(aws_client_provider: Egis::AwsClientProvider.new, s3_location_parser: Egis::S3LocationParser.new) ⇒ Client

Returns a new instance of Client.


40
41
42
43
44
# File 'lib/egis/client.rb', line 40

def initialize(aws_client_provider: Egis::AwsClientProvider.new, s3_location_parser: Egis::S3LocationParser.new)
  @aws_athena_client = aws_client_provider.athena_client
  @s3_location_parser = s3_location_parser
  @query_status_backoff = Egis.configuration.query_status_backoff || DEFAULT_QUERY_STATUS_BACKOFF
end

Instance Method Details

#database(database_name) ⇒ Egis::Database

Creates Database object with a given name. Executing it doesn't create Athena database yet.

Parameters:

  • database_name (String)

Returns:


52
53
54
# File 'lib/egis/client.rb', line 52

def database(database_name)
  Database.new(database_name, client: self)
end

#execute_query(query, work_group: nil, database: nil, output_location: nil, async: true) ⇒ Egis::QueryStatus

Executes Athena query. By default, queries are being executed asynchronously.

Parameters:

  • query (String)

    SQL query to execute

  • async (Boolean) (defaults to: true)

    Decide whether you want to run query asynchronously or block execution until it finishes

  • work_group (String) (defaults to: nil)

    Change Athena work group the query will be executed in.

  • database (String) (defaults to: nil)

    Run query in the context of a specific database (implicit table references are expected to be in given database).

  • output_location (String) (defaults to: nil)

    S3 url of the desired output location. By default, Athena uses location defined in by workgroup.

Returns:

Raises:


68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/egis/client.rb', line 68

def execute_query(query, work_group: nil, database: nil, output_location: nil, async: true)
  query_execution_id = aws_athena_client.start_query_execution(
    query_execution_params(query, work_group, database, output_location)
  ).query_execution_id

  return query_status(query_execution_id) if Egis.mode.async(async)

  query_status = wait_for_query_to_finish(query_execution_id)

  raise Egis::Errors::QueryExecutionError, query_status.message unless query_status.finished?

  query_status
end

#query_status(query_id) ⇒ Egis::QueryStatus

Check the status of asynchronous query execution.

Parameters:

Returns:


88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/egis/client.rb', line 88

def query_status(query_id)
  resp = aws_athena_client.get_query_execution(query_execution_id: query_id)

  query_execution = resp.query_execution

  Egis::QueryStatus.new(
    query_execution.query_execution_id,
    QUERY_STATUS_MAPPING.fetch(query_execution.status.state),
    query_execution.status.state_change_reason,
    parse_output_location(query_execution)
  )
end