Class: Flydata::SourceMysql::Parser::MysqlDumpParser

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/source_mysql/parser/dump_parser.rb,
ext/flydata/source_mysql/parser/dump_parser_ext.cpp

Defined Under Namespace

Classes: InsertParser

Constant Summary collapse

BINLOG_INV_ERROR_CHUNK_SIZE =
250

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dump_pos_info = {}) ⇒ MysqlDumpParser

Returns a new instance of MysqlDumpParser.

Raises:

  • (ArgumentError)


254
255
256
257
258
259
260
# File 'lib/flydata/source_mysql/parser/dump_parser.rb', line 254

def initialize(dump_pos_info = {})
  binlog_pos_object = dump_pos_info[:source_pos]
  raise ArgumentError.new("source position is required") unless binlog_pos_object

  @binlog_pos = { binfile: binlog_pos_object.filename, pos: binlog_pos_object.pos }
  @dump_pos_info = dump_pos_info
end

Instance Attribute Details

#binlog_posObject

Returns the value of attribute binlog_pos.



251
252
253
# File 'lib/flydata/source_mysql/parser/dump_parser.rb', line 251

def binlog_pos
  @binlog_pos
end

Instance Method Details

#parse(dmpio, create_table_block, insert_record_block, check_point_block) ⇒ Object



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
# File 'lib/flydata/source_mysql/parser/dump_parser.rb', line 262

def parse(dmpio, create_table_block, insert_record_block, check_point_block)
  unless dmpio.kind_of?(IO)
    raise ArgumentError.new("Invalid argument. The first parameter must be io.")
  end

  dump_io = nil
  invalid_file = false
  current_state = Flydata::Parser::State::START
  substate = nil
  buffered_line = nil
  bytesize = 0

  readline_proc = Proc.new do
    line = nil
    if buffered_line
      line = buffered_line
      buffered_line = nil
    else
      rawline = dump_io.readline
      bytesize += rawline.bytesize
      line = rawline.strip
    end
    line
  end

  state_start = Proc.new do
    line = readline_proc.call

    # -- Server version 5.6.21-log
    if line.start_with?('-- Server version')
      current_state = Flydata::Parser::State::CREATE_TABLE
      check_point_block.call(nil, dump_io.pos, bytesize, @binlog_pos, current_state)
    end
  end

  current_table = nil
  state_create_table = Proc.new do
    line = readline_proc.call

    # CREATE TABLE `active_admin_comments` (
    m = /^CREATE TABLE `(?<table_name>[^`]+)`/.match(line)
    if m
      current_table = Flydata::Parser::SourceTable.new(m[:table_name])
      current_state = Flydata::Parser::State::CREATE_TABLE_COLUMNS
    end
  end

  state_create_table_constraints = Proc.new do
    line = readline_proc.call

    #  PRIMARY KEY (`id`),
    if line.start_with?(')')
      create_table_block.call(current_table)
      current_state = Flydata::Parser::State::INSERT_RECORD
      check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state)
    end
  end

  state_create_table_columns = Proc.new do
    start_pos = dump_io.pos
    line = readline_proc.call
    line = FlydataCore::StringUtils.replace_invalid_utf8_char(line)

    #  `author_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
    if line.start_with?("\`")
      column = {}

      # parse column line
      line = line[0..-2] if line.end_with?(',')
      colname_end_index = line.index('`', 1) - 1
      column[:column_name] = line[1..colname_end_index]
      line = line[colname_end_index + 3..-1]
      items = line.split
      column[:format_type_str] = format_type_str = items.shift
      pos = format_type_str.index('(')
      if pos
        ft = column[:format_type] = format_type_str[0..pos-1]
        if ft == 'decimal'
          precision, scale = format_type_str[pos+1..-2].split(',').collect{|v| v.to_i}
          column[:decimal_precision] = precision
          column[:decimal_scale] = scale
        else
          column[:format_size] = format_type_str[pos+1..-2].to_i
        end
      else
        column[:format_type] = format_type_str
      end
      while (item = items.shift) do
        case item
        when 'DEFAULT'
          value = items.shift
          value = value.start_with?('\'') ? value[1..-2] : value
          value = nil if value == 'NULL'
          column[:default] = value
        when 'NOT'
          if items[1] == 'NULL'
            items.shift
            column[:not_null] = true
          end
        when 'unsigned'
          column[:unsigned] = true
        else
          #ignore other options
        end
      end

      current_table.add_column(column)
    else
      current_state = Flydata::Parser::State::CREATE_TABLE_CONSTRAINTS
      buffered_line = line
      state_create_table_constraints.call
    end
  end

  state_insert_record = Proc.new do
    line = readline_proc.call

    if line.start_with?('INSERT')
      buffered_line = line
      current_state = Flydata::Parser::State::PARSING_INSERT_RECORD
    elsif line.start_with?('UNLOCK')
      current_state = Flydata::Parser::State::CREATE_TABLE
      check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state)
    end
  end

  state_parsing_insert_record = Proc.new do
    line = readline_proc.call

    values_set = nil
    begin
      values_set = InsertParser.new.parse(line)
    rescue => e
      newe = e.class.new(e.message + "line:#{line}")
      newe.set_backtrace(e.backtrace)
      raise newe
    end
    current_state = Flydata::Parser::State::INSERT_RECORD

    if insert_record_block.call(current_table, values_set)
      values_set = nil
      check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state)
    end
  end

  # Start reading file from top
  begin
    # resume(only when using dump file)
    if @dump_pos_info[:last_pos] && (@dump_pos_info[:last_pos].to_i != -1)
      dmpio.pos = @dump_pos_info[:last_pos].to_i
      current_state = @dump_pos_info[:state]
      substate = @dump_pos_info[:substate]
      current_table = @dump_pos_info[:source_table]
      bytesize = dmpio.pos
    end

    dump_io = AsyncIO.new(dmpio)

    until dump_io.eof? do
      case current_state
      when Flydata::Parser::State::START
        state_start.call
      when Flydata::Parser::State::CREATE_TABLE
        state_create_table.call
      when Flydata::Parser::State::CREATE_TABLE_COLUMNS
        state_create_table_columns.call
      when Flydata::Parser::State::CREATE_TABLE_CONSTRAINTS
        state_create_table_constraints.call
      when Flydata::Parser::State::INSERT_RECORD
        state_insert_record.call
      when Flydata::Parser::State::PARSING_INSERT_RECORD
        state_parsing_insert_record.call
      end
    end
  ensure
    dump_io.close
  end
  @binlog_pos
end