Class: Embulk::Guess::CsvGuessPlugin

Inherits:
LineGuessPlugin show all
Defined in:
lib/embulk/guess/csv.rb

Constant Summary collapse

DELIMITER_CANDIDATES =
[
  ",", "\t", "|"
]
QUOTE_CANDIDATES =
[
  "\"", "'"
]
ESCAPE_CANDIDATES =
[
  "\\", '"'
]
NULL_STRING_CANDIDATES =
[
  "null",
  "NULL",
  "#N/A",
  "\\N",  # MySQL LOAD, Hive STORED AS TEXTFILE
]
COMMENT_LINE_MARKER_CANDIDATES =
[
  "#",
  "//",
]
MAX_SKIP_LINES =
10
NO_SKIP_DETECT_LINES =
10

Instance Method Summary collapse

Methods inherited from LineGuessPlugin

#guess

Methods inherited from Embulk::GuessPlugin

from_java, #guess, new_java

Instance Method Details

#guess_lines(config, sample_lines) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/embulk/guess/csv.rb', line 35

def guess_lines(config, sample_lines)
  return {} unless config.fetch("parser", {}).fetch("type", "csv") == "csv"

  parser_config = config["parser"] || {}
  if parser_config["type"] == "csv" && parser_config["delimiter"]
    delim = parser_config["delimiter"]
  else
    delim = guess_delimiter(sample_lines)
    unless delim
      # not CSV file
      return {}
    end
  end

  parser_guessed = DataSource.new.merge(parser_config).merge({"type" => "csv", "delimiter" => delim})

  unless parser_guessed.has_key?("quote")
    quote = guess_quote(sample_lines, delim)
    unless quote
      if !guess_force_no_quote(sample_lines, delim, '"')
        # assuming CSV follows RFC for quoting
        quote = '"'
      else
        # disable quoting (set null)
      end
    end
    parser_guessed["quote"] = quote
  end
  parser_guessed["quote"] = '"' if parser_guessed["quote"] == ''  # setting '' is not allowed any more. this line converts obsoleted config syntax to explicit syntax.

  unless parser_guessed.has_key?("escape")
    if quote = parser_guessed["quote"]
      escape = guess_escape(sample_lines, delim, quote)
      unless escape
        if quote == '"'
          # assuming this CSV follows RFC for escaping
          escape = '"'
        else
          # disable escaping (set null)
        end
      end
      parser_guessed["escape"] = escape
    else
      # escape does nothing if quote is disabled
    end
  end

  unless parser_guessed.has_key?("null_string")
    null_string = guess_null_string(sample_lines, delim)
    parser_guessed["null_string"] = null_string if null_string
    # don't even set null_string to avoid confusion of null and 'null' in YAML format
  end

  # guessing skip_header_lines should be before guessing guess_comment_line_marker
  # because lines supplied to CsvTokenizer already don't include skipped header lines.
  # skipping empty lines is also disabled here because skipping header lines is done by
  # CsvParser which doesn't skip empty lines automatically
  sample_records = split_lines(parser_guessed, false, sample_lines, delim, {})
  skip_header_lines = guess_skip_header_lines(sample_records)
  sample_lines = sample_lines[skip_header_lines..-1]
  sample_records = sample_records[skip_header_lines..-1]

  unless parser_guessed.has_key?("comment_line_marker")
    comment_line_marker, sample_lines =
      guess_comment_line_marker(sample_lines, delim, parser_guessed["quote"], parser_guessed["null_string"])
    if comment_line_marker
      parser_guessed["comment_line_marker"] = comment_line_marker
    end
  end

  sample_records = split_lines(parser_guessed, true, sample_lines, delim, {})

  first_types = SchemaGuess.types_from_array_records(sample_records[0, 1])
  other_types = SchemaGuess.types_from_array_records(sample_records[1..-1] || [])

  if first_types.size <= 1 || other_types.size <= 1
    # guess failed
    return {}
  end

  unless parser_guessed.has_key?("trim_if_not_quoted")
    sample_records_trimmed = split_lines(parser_guessed, true, sample_lines, delim, {"trim_if_not_quoted" => true})
    other_types_trimmed = SchemaGuess.types_from_array_records(sample_records_trimmed[1..-1] || [])
    if other_types != other_types_trimmed
      parser_guessed["trim_if_not_quoted"] = true
      other_types = other_types_trimmed
    else
      parser_guessed["trim_if_not_quoted"] = false
    end
  end

  header_line = (first_types != other_types && first_types.all? {|t| ["string", "boolean"].include?(t) }) || guess_string_header_line(sample_records)

  if header_line
    parser_guessed["skip_header_lines"] = skip_header_lines + 1
  else
    parser_guessed["skip_header_lines"] = skip_header_lines
  end

  parser_guessed["allow_extra_columns"] = false unless parser_guessed.has_key?("allow_extra_columns")
  parser_guessed["allow_optional_columns"] = false unless parser_guessed.has_key?("allow_optional_columns")

  if header_line
    column_names = sample_records.first
  else
    column_names = (0..other_types.size).to_a.map {|i| "c#{i}" }
  end
  schema = []
  column_names.zip(other_types).each do |name,type|
    if name && type
      if type.is_a?(SchemaGuess::TimestampTypeMatch)
        schema << {"name" => name, "type" => type, "format" => type.format}
      else
        schema << {"name" => name, "type" => type}
      end
    end
  end
  parser_guessed["columns"] = schema

  return {"parser" => parser_guessed}
end