Class: ResultsStream
- Inherits:
-
Object
- Object
- ResultsStream
- Defined in:
- lib/s3_selector/results_stream.rb
Constant Summary collapse
- DEFAULT_S3_SELECT_THREADS =
5
- DELIMITER =
"\n".bytes.first.freeze
Instance Attribute Summary collapse
-
#input_format ⇒ Object
Returns the value of attribute input_format.
-
#input_format_options ⇒ Object
Returns the value of attribute input_format_options.
-
#number_of_threads ⇒ Object
Returns the value of attribute number_of_threads.
-
#query ⇒ Object
Returns the value of attribute query.
-
#region ⇒ Object
Returns the value of attribute region.
-
#s3_client ⇒ Object
Returns the value of attribute s3_client.
-
#s3_files ⇒ Object
Returns the value of attribute s3_files.
Instance Method Summary collapse
-
#initialize(s3_files:, query: 'SELECT * FROM S3Object', s3_client: Aws::S3::Client.new, number_of_threads: DEFAULT_S3_SELECT_THREADS, region: 'us-east-1', input_format: :parquet, input_format_options: {}) ⇒ ResultsStream
constructor
A new instance of ResultsStream.
- #records ⇒ Object
Constructor Details
#initialize(s3_files:, query: 'SELECT * FROM S3Object', s3_client: Aws::S3::Client.new, number_of_threads: DEFAULT_S3_SELECT_THREADS, region: 'us-east-1', input_format: :parquet, input_format_options: {}) ⇒ ResultsStream
Returns a new instance of ResultsStream.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/s3_selector/results_stream.rb', line 8 def initialize( s3_files:, query: 'SELECT * FROM S3Object', s3_client: Aws::S3::Client.new, number_of_threads: DEFAULT_S3_SELECT_THREADS, region: 'us-east-1', input_format: :parquet, input_format_options: {} ) self.s3_files = s3_files self.s3_client = s3_client self.query = query self.number_of_threads = number_of_threads self.region = region self.input_format = input_format self. = end |
Instance Attribute Details
#input_format ⇒ Object
Returns the value of attribute input_format.
5 6 7 |
# File 'lib/s3_selector/results_stream.rb', line 5 def input_format @input_format end |
#input_format_options ⇒ Object
Returns the value of attribute input_format_options.
5 6 7 |
# File 'lib/s3_selector/results_stream.rb', line 5 def @input_format_options end |
#number_of_threads ⇒ Object
Returns the value of attribute number_of_threads.
5 6 7 |
# File 'lib/s3_selector/results_stream.rb', line 5 def number_of_threads @number_of_threads end |
#query ⇒ Object
Returns the value of attribute query.
5 6 7 |
# File 'lib/s3_selector/results_stream.rb', line 5 def query @query end |
#region ⇒ Object
Returns the value of attribute region.
5 6 7 |
# File 'lib/s3_selector/results_stream.rb', line 5 def region @region end |
#s3_client ⇒ Object
Returns the value of attribute s3_client.
5 6 7 |
# File 'lib/s3_selector/results_stream.rb', line 5 def s3_client @s3_client end |
#s3_files ⇒ Object
Returns the value of attribute s3_files.
5 6 7 |
# File 'lib/s3_selector/results_stream.rb', line 5 def s3_files @s3_files end |
Instance Method Details
#records ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/s3_selector/results_stream.rb', line 26 def records Enumerator.new do |yielder| if s3_files.length == 1 read_s3_file(file: s3_files.first) { |data| yielder << data } else ConcurrentExecutor.consume_enumerable(s3_files) do |s3_file| read_s3_file(file: s3_file) { |data| yielder << data } end end end end |