Class: Bricolage::MySQLTask::S3Export

Inherits:
Action
  • Object
show all
Defined in:
lib/bricolage/mysqldatasource.rb

Constant Summary collapse

OPTION_MAP =
{
  encoding: 'useUnicode=true&characterEncoding',
  read_timeout: 'netTimeoutForStreamingResults',
  connect_timeout: 'connectTimeout',
  reconnect: 'autoReconnect',
  collation: 'connectionCollation'
}

Instance Method Summary collapse

Constructor Details

#initialize(table, stmt, s3ds, prefix, gzip: true, format: "json", partition_column: nil, partition_number: 4, write_concurrency: 4, rotation_size: nil, delete_objects: false, object_key_delimiter: nil, src_zone_offset: nil, dst_zone_offset: nil) ⇒ S3Export

Returns a new instance of S3Export.



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/bricolage/mysqldatasource.rb', line 192

def initialize(table, stmt, s3ds, prefix, gzip: true,
   format: "json",
   partition_column: nil,
   partition_number: 4,
   write_concurrency: 4,
   rotation_size: nil,
   delete_objects: false,
   object_key_delimiter: nil,
   src_zone_offset: nil,
   dst_zone_offset: nil)
  @table = table
  @statement = stmt
  @s3ds = s3ds
  @prefix = build_prefix @s3ds.prefix, prefix
  @format = format
  @gzip = gzip
  @partition_column = partition_column
  @partition_number = partition_number
  @write_concurrency = write_concurrency
  @rotation_size = rotation_size
  @delete_objects = delete_objects
  @object_key_delimiter = object_key_delimiter
  @src_zone_offset = src_zone_offset
  @dst_zone_offset = dst_zone_offset
end

Instance Method Details

#bind(*args) ⇒ Object



223
224
225
# File 'lib/bricolage/mysqldatasource.rb', line 223

def bind(*args)
  @statement.bind(*args) if @statement
end

#build_cmd(options) ⇒ Object



302
303
304
# File 'lib/bricolage/mysqldatasource.rb', line 302

def build_cmd(options)
  (['java'] + options.flat_map {|k, v| v ? ["-#{k}", v.to_s] : ["-#{k}"] }.map {|o| %Q("#{o}") }).join(" ")
end

#build_prefix(ds_prefix, pm_prefix) ⇒ Object



294
295
296
# File 'lib/bricolage/mysqldatasource.rb', line 294

def build_prefix(ds_prefix, pm_prefix)
  ((ds_prefix || "") + "//" +  (pm_prefix.to_s || "")).gsub(%r<\A/>, '').gsub(%r<//>, '/')
end

#command_parametersObject



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/bricolage/mysqldatasource.rb', line 251

def command_parameters
  params = {
    jar: mys3dump_path.to_s,
    h: ds.host,
    P: ds.port.to_s,
    D: ds.database,
    u: ds.username,
    #p: ds.password,
    o: connection_property,
    t: @table,
    b: @s3ds.bucket.name,
    x: @prefix
  }
  params[:q] = @statement.stripped_source.chomp(';') if @statement
  params[:f] = @format if @format
  params[:C] = nil if @gzip
  params[:c] = @partition_column if @partition_column
  params[:n] = @partition_number if @partition_number
  params[:w] = @write_concurrency if @write_concurrency
  params[:r] = @rotation_size if @rotation_size
  params[:d] = nil if @delete_objects
  params[:k] = @object_key_delimiter if @object_key_delimiter
  if src_zone_offset = @src_zone_offset || ds.mysql_options[:src_zone_offset]
    params[:S] = src_zone_offset
  end
  if dst_zone_offset = @dst_zone_offset || ds.mysql_options[:dst_zone_offset]
    params[:T] = dst_zone_offset
  end
  params
end

#connection_propertyObject



290
291
292
# File 'lib/bricolage/mysqldatasource.rb', line 290

def connection_property
  ds.mysql_options.map {|k, v| opt = OPTION_MAP[k] ; opt ? "#{opt}=#{v}" : nil }.compact.join('&')
end

#environment_variablesObject



243
244
245
246
247
248
249
# File 'lib/bricolage/mysqldatasource.rb', line 243

def environment_variables
  {
    'AWS_ACCESS_KEY_ID' => @s3ds.access_key,
    'AWS_SECRET_ACCESS_KEY' => @s3ds.secret_key,
    'MYS3DUMP_PASSWORD' => ds.password
  }
end

#extract_exception_message(out) ⇒ Object



306
307
308
309
310
311
312
# File 'lib/bricolage/mysqldatasource.rb', line 306

def extract_exception_message(out)
  out.lines do |line|
    if /^.*Exception: (?<msg>.*)$/ =~ line
      return msg
    end
  end
end

#mys3dump_pathObject



298
299
300
# File 'lib/bricolage/mysqldatasource.rb', line 298

def mys3dump_path
  Pathname(__dir__).parent.parent + "libexec/mys3dump.jar"
end

#runObject



218
219
220
221
# File 'lib/bricolage/mysqldatasource.rb', line 218

def run
  s3export
  JobResult.success
end

#s3exportObject



232
233
234
235
236
237
238
239
240
241
# File 'lib/bricolage/mysqldatasource.rb', line 232

def s3export
  cmd = build_cmd(command_parameters)
  ds.logger.info "[CMD] #{cmd}"
  out, st = Open3.capture2e(environment_variables, cmd)
  ds.logger.info "[CMDOUT] #{out}"
  unless st.success?
    msg = extract_exception_message(out)
    raise JobFailure, "mys3dump failed (status: #{st.to_i}): #{msg}"
  end
end

#sourceObject



227
228
229
230
# File 'lib/bricolage/mysqldatasource.rb', line 227

def source
  "-- myexport #{@table} -> #{@s3ds.bucket_name}/#{@prefix}" +
    (@statement ? "\n#{@statement.stripped_source}" : "")
end