Class: ResultsStream

Inherits:
Object
  • Object
show all
Defined in:
lib/s3_selector/results_stream.rb

Constant Summary collapse

DEFAULT_S3_SELECT_THREADS =
5
DELIMITER =
"\n".bytes.first.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(s3_files:, query: 'SELECT * FROM S3Object', s3_client: Aws::S3::Client.new, number_of_threads: DEFAULT_S3_SELECT_THREADS, region: 'us-east-1', input_format: :parquet, input_format_options: {}) ⇒ ResultsStream

Returns a new instance of ResultsStream.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/s3_selector/results_stream.rb', line 8

def initialize(
  s3_files:,
  query: 'SELECT * FROM S3Object',
  s3_client: Aws::S3::Client.new,
  number_of_threads: DEFAULT_S3_SELECT_THREADS,
  region: 'us-east-1',
  input_format: :parquet,
  input_format_options: {}
)
  self.s3_files             = s3_files
  self.s3_client            = s3_client
  self.query                = query
  self.number_of_threads    = number_of_threads
  self.region               = region
  self.input_format         = input_format
  self.input_format_options = input_format_options
end

Instance Attribute Details

#input_formatObject

Returns the value of attribute input_format.



5
6
7
# File 'lib/s3_selector/results_stream.rb', line 5

def input_format
  @input_format
end

#input_format_optionsObject

Returns the value of attribute input_format_options.



5
6
7
# File 'lib/s3_selector/results_stream.rb', line 5

def input_format_options
  @input_format_options
end

#number_of_threadsObject

Returns the value of attribute number_of_threads.



5
6
7
# File 'lib/s3_selector/results_stream.rb', line 5

def number_of_threads
  @number_of_threads
end

#queryObject

Returns the value of attribute query.



5
6
7
# File 'lib/s3_selector/results_stream.rb', line 5

def query
  @query
end

#regionObject

Returns the value of attribute region.



5
6
7
# File 'lib/s3_selector/results_stream.rb', line 5

def region
  @region
end

#s3_clientObject

Returns the value of attribute s3_client.



5
6
7
# File 'lib/s3_selector/results_stream.rb', line 5

def s3_client
  @s3_client
end

#s3_filesObject

Returns the value of attribute s3_files.



5
6
7
# File 'lib/s3_selector/results_stream.rb', line 5

def s3_files
  @s3_files
end

Instance Method Details

#recordsObject



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/s3_selector/results_stream.rb', line 26

def records
  Enumerator.new do |yielder|
    if s3_files.length == 1
      read_s3_file(file: s3_files.first) { |data| yielder << data }
    else
      ConcurrentExecutor.consume_enumerable(s3_files) do |s3_file|
        read_s3_file(file: s3_file) { |data| yielder << data }
      end
    end
  end
end