Module: RocketJob::Batch::IO

Extended by:
ActiveSupport::Concern
Included in:
RocketJob::Batch
Defined in:
lib/rocket_job/batch/io.rb

Overview

IO methods for sliced jobs

Instance Method Summary collapse

Instance Method Details

#download(stream = nil, category: :main, header_line: nil, **args, &block) ⇒ Object

Download the output data into the supplied file, io, IOStreams::Path, or IOStreams::Stream. Returns [Integer] the number of records / lines downloaded.

Parameters

stream [String | IO | IOStreams::Path | IOStreams::Stream]
  Full path and file name to stream into the job,
  Or, an IO stream that responds to: :write
  Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3

Example: Zip

# Since csv is not known to RocketJob it is ignored
job.download('myfile.csv.zip')

Example: Encrypted Zip

job.download('myfile.csv.zip.enc')

Example: Explicitly set the streams

path = IOStreams.path('myfile.ze').stream(:zip).stream(:enc)
job.download(path)

Example: Supply custom options

path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false)
job.download(path)

Example: Supply custom options. Set the file name within the zip file.

path = IOStreams.path('myfile.csv.zip').option(:zip, zip_file_name: 'myfile.csv')
job.download(path)

Example: Download into a tempfile, or stream, using the original file name to determine the streams to apply:

tempfile = Tempfile.new('my_project')
stream = IOStreams.stream(tempfile).file_name('myfile.gz.enc')
job.download(stream)

Example: Add a header and/or trailer record to the downloaded file:

IOStreams.path('/tmp/file.txt.gz').writer do |writer|
  writer << "Header\n"
  job.download do |line|
    writer << line + "\n"
  end
  writer << "Trailer\n"
end

Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks:

IOStreams.path('/tmp/file.txt.gz').writer(:line) do |writer|
  writer << "Header"
  job.download do |line|
    writer << line
  end
  writer << "Trailer"
end

Notes:

  • The records are returned in ‘_id’ order. Usually this is the order in which the records were originally loaded.

Raises:

  • (ArgumentError)


443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
# File 'lib/rocket_job/batch/io.rb', line 443

def download(stream = nil, category: :main, header_line: nil, **args, &block)
  raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing?

  category           = output_category(category) unless category.is_a?(Category::Output)
  output_collection  = output(category)

  # Store the output file name in the category
  category.file_name = stream if !block && (stream.is_a?(String) || stream.is_a?(IOStreams::Path))

  header_line ||= category.render_header

  return output_collection.download(header_line: header_line, &block) if block

  raise(ArgumentError, "Missing mandatory `stream` or `category.file_name`") unless stream || category.file_name

  if output_collection.slice_class.binary_format
    binary_header_line = output_collection.slice_class.to_binary(header_line) if header_line

    # Don't overwrite supplied stream options if any
    stream = stream&.is_a?(IOStreams::Stream) ? stream.dup : IOStreams.new(category.file_name)
    stream.remove_from_pipeline(output_collection.slice_class.binary_format)
    stream.writer(**args) do |io|
      # TODO: Binary formats should return the record count, instead of the slice count.
      output_collection.download(header_line: binary_header_line) { |record| io.write(record) }
    end
  else
    # TODO: Add category to named tags to aid problem determination
    # And RJ Download metric with duration
    IOStreams.new(stream || category.file_name).writer(:line, **args) do |io|
      output_collection.download(header_line: header_line) { |record| io << record }
    end
  end
end

#input(category = :main) ⇒ Object

Returns [RocketJob::Sliced::Input] input collection for holding input slices

Parameters:

category [Symbol|RocketJob::Category::Input]
  The category or the name of the category to access or upload data into
  Default: None ( Uses the single default input collection for this job )
  Validates: This value must be one of those listed in #input_categories


16
17
18
19
20
# File 'lib/rocket_job/batch/io.rb', line 16

def input(category = :main)
  category = input_category(category)

  (@inputs ||= {})[category.name] ||= category.data_store(self)
end

#output(category = :main) ⇒ Object

Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected

Parameters:

category [Symbol|RocketJob::Category::Input]
  The category or the name of the category to access or download data from
  Default: None ( Uses the single default output collection for this job )
  Validates: This value must be one of those listed in #output_categories


30
31
32
33
34
# File 'lib/rocket_job/batch/io.rb', line 30

def output(category = :main)
  category = output_category(category)

  (@outputs ||= {})[category.name] ||= category.data_store(self)
end

#upload(object = nil, category: :main, file_name: nil, stream_mode: nil, on_first: nil, columns: nil, slice_batch_size: nil, **args, &block) ⇒ Object

Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first

Returns [Integer] the number of slices uploaded.

Uploads one range per slice so that the response can return multiple records for each slice processed. Useful for when the highest order integer values should be processed before the lower integer value ranges. For example when processing every record in a database based on the id column

Example

job.input_category.slice_size = 100
job.upload_integer_range_in_reverse_order(200, 421)

# Equivalent to calling:
job.input.insert([400,421])
job.input.insert([300,399])
job.input.insert([200,299])

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to: last_id - start_id + 1.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/rocket_job/batch/io.rb', line 272

def upload(object = nil, category: :main, file_name: nil, stream_mode: nil, on_first: nil, columns: nil, slice_batch_size: nil, **args, &block)
  input_collection = input(category)

  if block
    raise(ArgumentError, "Cannot supply both an object to upload, and a block.") if object
    if stream_mode || columns || slice_batch_size || args.size > 0
      raise(ArgumentError, "Unknown keyword arguments when uploading a block. Only accepts :category, :file_name, or :on_first")
    end

    category           = input_category(category)
    category.file_name = file_name if file_name

    # Extract the header line during the upload when applicable.
    extract_header = category.extract_header_callback(on_first)

    count             = input_collection.upload(on_first: extract_header, slice_batch_size: slice_batch_size, &block)
    self.record_count = (record_count || 0) + count
    return count
  end

  count =
    case object
    when Range
      if file_name || stream_mode || on_first || args.size > 0
        raise(ArgumentError, "Unknown keyword arguments when uploading a Range. Only accepts :category, :columns, or :slice_batch_size")
      end

      first = object.first
      last  = object.last
      if first < last
        input_collection.upload_integer_range(first, last, slice_batch_size: slice_batch_size || 1_000)
      else
        input_collection.upload_integer_range_in_reverse_order(last, first, slice_batch_size: slice_batch_size || 1_000)
      end
    when Mongoid::Criteria
      if file_name || stream_mode || on_first || args.size > 0
        raise(ArgumentError, "Unknown keyword arguments when uploading a Mongoid::Criteria. Only accepts :category, :columns, or :slice_batch_size")
      end

      input_collection.upload_mongo_query(object, columns: columns, slice_batch_size: slice_batch_size, &block)
    when defined?(ActiveRecord::Relation) ? ActiveRecord::Relation : false
      if file_name || stream_mode || on_first || args.size > 0
        raise(ArgumentError, "Unknown keyword arguments when uploading an ActiveRecord::Relation. Only accepts :category, :columns, or :slice_batch_size")
      end

      input_collection.upload_arel(object, columns: columns, slice_batch_size: slice_batch_size, &block)

    else
      raise(ArgumentError, "Unknown keyword argument :columns when uploading a file") if columns

      category = input_category(category)

      # Extract the header line during the upload when applicable.
      extract_header = category.extract_header_callback(on_first)
      path = category.upload_path(object, original_file_name: file_name)

      input_collection.upload(on_first: extract_header, slice_batch_size: slice_batch_size) do |io|
        path.each(stream_mode || :line, **args) { |line| io << line }
      end

    end

  self.record_count = (record_count || 0) + count
  count
end

#upload_arel(arel, *column_names, category: :main, &block) ⇒ Object

Deprecated.


339
340
341
342
343
# File 'lib/rocket_job/batch/io.rb', line 339

def upload_arel(arel, *column_names, category: :main, &block)
  count             = input(category).upload_arel(arel, columns: column_names, &block)
  self.record_count = (record_count || 0) + count
  count
end

#upload_integer_range(start_id, last_id, category: :main, slice_batch_size: 1_000) ⇒ Object

Deprecated.


353
354
355
356
357
# File 'lib/rocket_job/batch/io.rb', line 353

def upload_integer_range(start_id, last_id, category: :main, slice_batch_size: 1_000)
  count             = input(category).upload_integer_range(start_id, last_id, slice_batch_size: slice_batch_size)
  self.record_count = (record_count || 0) + count
  count
end

#upload_integer_range_in_reverse_order(start_id, last_id, category: :main, slice_batch_size: 1_000) ⇒ Object

Deprecated.


360
361
362
363
364
# File 'lib/rocket_job/batch/io.rb', line 360

def upload_integer_range_in_reverse_order(start_id, last_id, category: :main, slice_batch_size: 1_000)
  count             = input(category).upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: slice_batch_size)
  self.record_count = (record_count || 0) + count
  count
end

#upload_mongo_query(criteria, *column_names, category: :main, &block) ⇒ Object

Deprecated.


346
347
348
349
350
# File 'lib/rocket_job/batch/io.rb', line 346

def upload_mongo_query(criteria, *column_names, category: :main, &block)
  count             = input(category).upload_mongo_query(criteria, columns: column_names, &block)
  self.record_count = (record_count || 0) + count
  count
end

#upload_slice(slice, category: :main) ⇒ Object

Upload the supplied slice for processing by workers

Updates the record_count after adding the records

Returns [Integer] the number of records uploaded

Parameters

`slice` [ Array<Hash | Array | String | Integer | Float | Symbol | Regexp | Time> ]
  All elements in `array` must be serializable to BSON
  For example the following types are not supported: Date

Note:

The caller should implement `:slice_size`, since the entire slice is saved as-is.

Note:

Not thread-safe. Only call from one thread at a time


382
383
384
385
386
387
# File 'lib/rocket_job/batch/io.rb', line 382

def upload_slice(slice, category: :main)
  input(category).insert(slice)
  count             = slice.size
  self.record_count = (record_count || 0) + count
  count
end