Class: Flydata::SourcePostgresql::DumpParser

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/source_postgresql/parse_dump_and_send.rb

Constant Summary collapse

MAX_ROW_BYTES =
1 * 1024 * 1024

Instance Method Summary collapse

Constructor Details

#initialize(dump_pos_info, dmpio, create_table_block, insert_record_block, check_point_block) ⇒ DumpParser

Parser holds rows until the total byte size reaches this number

Raises:

  • (ArgumentError)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 34

def initialize(dump_pos_info, dmpio, create_table_block, insert_record_block,
               check_point_block)
  @source_pos = dump_pos_info[:source_pos]
  raise ArgumentError.new("source position is required") unless @source_pos
  @current_table = nil
  @last_pos = 0
  @row_head_pos = nil
  @rows = []
  @dmpio = dmpio
  @create_table_block = create_table_block
  @insert_record_block = insert_record_block
  @check_point_block = check_point_block

  resume(dump_pos_info)
end

Instance Method Details

#closeObject



63
64
65
66
67
68
69
70
71
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 63

def close
  unless @rows.empty?
    call_insert_record_block
    # Core logic expects a check point callback with CREATE_TABLE at the
    # end of table data insertion of each table.  #handle_table_info takes
    # care of all tables but the last table.  This is for the last table.
    call_check_point_block(Parser::State::CREATE_TABLE)
  end
end

#parse(obj) ⇒ Object



59
60
61
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 59

def parse(obj)
  obj.kind_of?(Hash) ? handle_table_info(obj) : handle_data_row(obj)
end

#parse_allObject



50
51
52
53
54
55
56
57
# File 'lib/flydata/source_postgresql/parse_dump_and_send.rb', line 50

def parse_all
  u = MessagePack::Unpacker.new(@dmpio)
  u.each do |obj|
    @last_pos = @dmpio.pos - u.buffer.size
    parse(obj)
  end
  close
end