Class: S3::Client::API::Storage::Import

Inherits:
Object
  • Object
show all
Defined in:
lib/s3/client/api/storage.rb

Defined Under Namespace

Classes: ImportParameter

Instance Method Summary collapse

Constructor Details

#initialize(db_name, tbl_name, file_paths, options = {}, &block) ⇒ Import

Returns a new instance of Import.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/s3/client/api/storage.rb', line 123

def initialize(db_name, tbl_name, file_paths, options = {}, &block)
  @db_name = db_name
  @tbl_naem = tbl_name
  @file_paths = file_paths
  @jobs = options.delete(:jobs) || 1
  @label = options.delete(:label) || 'label'
  @splitsz = options.delete(:splitsz) || 100 * 1024 ** 2 #100M
  @api = block[]

  import_parameter = ImportParameter.instance
  import_parameter.db_name = db_name
  import_parameter.tbl_name = tbl_name
  import_parameter.label = @label

  if %w(_ .).include? @label[0]
    raise S3::Client::ParameterInvalid.new("label should not start with '_' or '.'")
  end

  STDERR.puts "Initialize...\njobs: #{@jobs}, splitsz: #{@splitsz}"
end

Instance Method Details

#calc_label_suffixObject



144
145
146
147
148
149
150
151
152
153
# File 'lib/s3/client/api/storage.rb', line 144

def calc_label_suffix
  prefix = ImportParameter.instance.storage_prefix
  xml_doc = @api.objects(@db_name, prefix: prefix)
  objects_result = S3::Concerns::ObjectsResult.new(xml_doc)
  objects = objects_result.objects

  return 0 if objects.blank?

  objects.map { |o| o.scan(/#{@label}_(\d+)/) }.flatten.map(&:to_i).sort.reverse.first.try(:+, 1)
end

#execute(suffix) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/s3/client/api/storage.rb', line 155

def execute(suffix)
  file_paths = @file_paths.is_a?(String) ? [@file_paths] : @file_paths

  upload_objects = []
  file_paths.each do |file_path|
    file_index = if file_path.end_with?('.gz')
                    import_gz_file(file_path, suffix, upload_objects)
                  elsif file_path == "-"
                    import_stream($stdin, suffix, upload_objects)
                  else
                    import_text_file(file_path, suffix, upload_objects)
                  end

    suffix += file_index
  end

  return upload_objects
end

#execute_storage_detail(data, suffix) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/s3/client/api/storage.rb', line 225

def execute_storage_detail(data, suffix)
  str = StringIO.new
  gz = Zlib::GzipWriter.new(str)
  gz.write data
  gz.close

  options = {
      content_type: 'application/x-gzip',
      bucket: @db_name,
      import: true
  }

  resource = ImportParameter.instance.url(suffix)
  @api.execute_storage(RestParameter.new(:put, resource, options)) do
    str.string
  end
end

#import_gz_file(file_path, suffix, upload_objects) ⇒ Object



174
175
176
177
178
179
# File 'lib/s3/client/api/storage.rb', line 174

def import_gz_file(file_path, suffix, upload_objects)
  import_stream(Zlib::GzipReader.open(file_path), suffix, upload_objects)
rescue Zlib::Error
  #if not gzip
  import_text_file(file_path, suffix, upload_objects)
end

#import_stream(ifp, suffix, upload_objects) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/s3/client/api/storage.rb', line 185

def import_stream(ifp, suffix, upload_objects)
  q = SizedQueue.new(@jobs)
  th = Array.new(@jobs) {
    Thread.new{
      while data = q.pop
        break unless data
        STDERR.puts "> starting upload part #{data[2]}, #{data[1].length}"
        execute_storage_detail(data[1], suffix + data[0])
        STDERR.puts "< finished upload part #{data[2]}, #{data[1].length}"
        upload_objects << ImportParameter.instance.object_label(suffix + data[0])
      end
      q.push nil
    }
  }

  begin
    file_index = 0
    import_index = ImportParameter.instance.index
    while true
      buffer = ifp.read(@splitsz)
      break unless buffer
      buffer.force_encoding("ASCII-8BIT")
      nline = ifp.gets
      if nline
        nline.force_encoding("ASCII-8BIT")
        buffer.concat(nline)
      end
      q.push [file_index, buffer, import_index]
      file_index += 1
      import_index += 1
    end
    q.push nil
  end

  th.map(&:join)
  ifp.close

  file_index
end

#import_text_file(file_path, suffix, upload_objects) ⇒ Object



181
182
183
# File 'lib/s3/client/api/storage.rb', line 181

def import_text_file(file_path, suffix, upload_objects)
  import_stream(File.open(file_path), suffix, upload_objects)
end