Class: Flydata::SourcePostgresql::DumpParser
- Inherits:
-
Object
- Object
- Flydata::SourcePostgresql::DumpParser
- Defined in:
- lib/flydata/source_postgresql/parse_dump_and_send.rb
Constant Summary collapse
- MAX_ROW_BYTES =
1 * 1024 * 1024
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(dump_pos_info, dmpio, create_table_block, insert_record_block, check_point_block) ⇒ DumpParser
constructor
Parser holds rows until the total byte size reaches this number.
- #parse(obj) ⇒ Object
- #parse_all ⇒ Object
Constructor Details
#initialize(dump_pos_info, dmpio, create_table_block, insert_record_block, check_point_block) ⇒ DumpParser
Parser holds rows until the total byte size reaches this number
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 34 def initialize(dump_pos_info, dmpio, create_table_block, insert_record_block, check_point_block) @source_pos = dump_pos_info[:source_pos] raise ArgumentError.new("source position is required") unless @source_pos @current_table = nil @last_pos = 0 @row_head_pos = nil @rows = [] @dmpio = dmpio @create_table_block = create_table_block @insert_record_block = insert_record_block @check_point_block = check_point_block resume(dump_pos_info) end |
Instance Method Details
#close ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 63 def close unless @rows.empty? call_insert_record_block # Core logic expects a check point callback with CREATE_TABLE at the # end of table data insertion of each table. #handle_table_info takes # care of all tables but the last table. This is for the last table. call_check_point_block(Parser::State::CREATE_TABLE) end end |
#parse(obj) ⇒ Object
59 60 61 |
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 59 def parse(obj) obj.kind_of?(Hash) ? handle_table_info(obj) : handle_data_row(obj) end |
#parse_all ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 50 def parse_all u = MessagePack::Unpacker.new(@dmpio) u.each do |obj| @last_pos = @dmpio.pos - u.buffer.size parse(obj) end close end |