Module: AthenaUDF::Utils
- Included in:
- BaseUDF
- Defined in:
- lib/athena-udf/utils.rb
Instance Method Summary collapse
- #get_record_batch_bytes(schema, record_batch) ⇒ Object
- #get_record_batch_index(bytes) ⇒ Object
- #get_schema_bytes(schema) ⇒ Object
- #read_record_batches(schema_data, record_batch_data) ⇒ Object
- #read_schema(schema_data) ⇒ Object
Instance Method Details
#get_record_batch_bytes(schema, record_batch) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/athena-udf/utils.rb', line 46 def get_record_batch_bytes(schema, record_batch) buffer = Arrow::ResizableBuffer.new(0) Arrow::BufferOutputStream.open(buffer) do |output| Arrow::RecordBatchStreamWriter.open(output, schema) do |writer| writer.write_record_batch(record_batch) end bytes = buffer.data.to_s start_index = get_record_batch_index(bytes) bytes[start_index..] end end |
#get_record_batch_index(bytes) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/athena-udf/utils.rb', line 59 def get_record_batch_index(bytes) size = bytes.size found_count = 0 start_index = 0 0.upto(size - 4).each do |i| has_ffff = bytes.slice(i, 4) == "\xFF\xFF\xFF\xFF".b found_count += 1 if has_ffff next unless found_count == 2 start_index = i + 4 break end start_index end |
#get_schema_bytes(schema) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/athena-udf/utils.rb', line 33 def get_schema_bytes(schema) buffer = Arrow::ResizableBuffer.new(0) Arrow::BufferOutputStream.open(buffer) do |output| Arrow::RecordBatchStreamWriter.open(output, schema) do |writer| # noop end bytes = buffer.data.to_s start_index = get_record_batch_index(bytes) bytes[4..start_index - 5] end end |
#read_record_batches(schema_data, record_batch_data) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/athena-udf/utils.rb', line 5 def read_record_batches(schema_data, record_batch_data) buffer = Arrow::ResizableBuffer.new(schema_data.bytes.size + record_batch_data.bytes.size) Arrow::BufferOutputStream.open(buffer) do |output| output.write(schema_data) output.write(record_batch_data) Arrow::BufferInputStream.open(buffer) do |input| reader = Arrow::RecordBatchStreamReader.new(input) input_schema = reader.schema reader.each do |record_batch| yield input_schema, record_batch end end end end |
#read_schema(schema_data) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/athena-udf/utils.rb', line 21 def read_schema(schema_data) buffer = Arrow::ResizableBuffer.new(schema_data.bytes.size) Arrow::BufferOutputStream.open(buffer) do |output| output.write(schema_data) Arrow::BufferInputStream.open(buffer) do |input| reader = Arrow::RecordBatchStreamReader.new(input) reader.schema end end end |