Class: Dag::Client::API::Storage::Import

Inherits:
Object
  • Object
show all
Defined in:
lib/dag/client/api/storage.rb

Defined Under Namespace

Classes: ImportParameter

Instance Method Summary collapse

Constructor Details

#initialize(db_name, tbl_name, file_paths, options = {}, &block) ⇒ Import

Returns a new instance of Import.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/dag/client/api/storage.rb', line 116

def initialize(db_name, tbl_name, file_paths, options = {}, &block)
  @db_name = db_name
  @tbl_naem = tbl_name
  @file_paths = file_paths
  @jobs = options.delete(:jobs) || 1
  @label = options.delete(:label) || 'label'
  @splitsz = options.delete(:splitsz) || 100 * 1024 ** 2 #100M
  @api = block[]

  import_parameter = ImportParameter.instance
  import_parameter.db_name = db_name
  import_parameter.tbl_name = tbl_name
  import_parameter.label = @label

  if %w(_ .).include? @label[0]
    raise Dag::Client::ParameterInvalid.new("label should not start with '_' or '.'")
  end

  STDERR.puts "Initialize...\njobs: #{@jobs}, splitsz: #{@splitsz}"
end

Instance Method Details

#calc_label_suffixObject



137
138
139
140
141
142
143
144
# File 'lib/dag/client/api/storage.rb', line 137

def calc_label_suffix
  prefix = ImportParameter.instance.storage_prefix
  objects = @api.objects(@db_name, prefix: prefix).objects

  return 0 if objects.blank?

  objects.map { |o| o.scan(/#{@label}_(\d+)/) }.flatten.map(&:to_i).sort.reverse.first + 1
end

#execute(suffix) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/dag/client/api/storage.rb', line 146

def execute(suffix)
  file_paths = @file_paths.is_a?(String) ? [@file_paths] : @file_paths

  upload_objects = []
  file_paths.each do |file_path|
    file_index = if file_path.end_with?('.gz')
                    import_gz_file(file_path, suffix, upload_objects)
                  elsif file_path == "-"
                    import_stream($stdin, suffix, upload_objects)
                  else
                    import_text_file(file_path, suffix, upload_objects)
                  end

    suffix += file_index
  end

  return upload_objects
end

#execute_storage_detail(data, suffix) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/dag/client/api/storage.rb', line 216

def execute_storage_detail(data, suffix)
  str = StringIO.new
  gz = Zlib::GzipWriter.new(str)
  gz.write data
  gz.close

  options = {
      content_type: 'application/x-gzip',
      bucket: @db_name,
      import: true
  }

  resource = ImportParameter.instance.url(suffix)
  @api.execute_storage(RestParameter.new(:put, resource, options)) do
    str.string
  end
end

#import_gz_file(file_path, suffix, upload_objects) ⇒ Object



165
166
167
168
169
170
# File 'lib/dag/client/api/storage.rb', line 165

def import_gz_file(file_path, suffix, upload_objects)
  import_stream(Zlib::GzipReader.open(file_path), suffix, upload_objects)
rescue Zlib::Error
  #if not gzip
  import_text_file(file_path, suffix, upload_objects)
end

#import_stream(ifp, suffix, upload_objects) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/dag/client/api/storage.rb', line 176

def import_stream(ifp, suffix, upload_objects)
  q = SizedQueue.new(@jobs)
  th = Array.new(@jobs) {
    Thread.new{
      while data = q.pop
        break unless data
        STDERR.puts "> starting upload part #{data[2]}, #{data[1].length}"
        execute_storage_detail(data[1], suffix + data[0])
        STDERR.puts "< finished upload part #{data[2]}, #{data[1].length}"
        upload_objects << ImportParameter.instance.object_label(suffix + data[0])
      end
      q.push nil
    }
  }

  begin
    file_index = 0
    import_index = ImportParameter.instance.index
    while true
      buffer = ifp.read(@splitsz)
      break unless buffer
      buffer.force_encoding("ASCII-8BIT")
      nline = ifp.gets
      if nline
        nline.force_encoding("ASCII-8BIT")
        buffer.concat(nline)
      end
      q.push [file_index, buffer, import_index]
      file_index += 1
      import_index += 1
    end
    q.push nil
  end

  th.map(&:join)
  ifp.close

  file_index
end

#import_text_file(file_path, suffix, upload_objects) ⇒ Object



172
173
174
# File 'lib/dag/client/api/storage.rb', line 172

def import_text_file(file_path, suffix, upload_objects)
  import_stream(File.open(file_path), suffix, upload_objects)
end