Module: RocketJob::Batch::IO
- Extended by:
- ActiveSupport::Concern
- Included in:
- RocketJob::Batch
- Defined in:
- lib/rocket_job/batch/io.rb
Overview
IO methods for sliced jobs
Instance Method Summary collapse
-
#download(stream = nil, category: :main, header_line: nil, **args, &block) ⇒ Object
Download the output data into the supplied file, io, IOStreams::Path, or IOStreams::Stream.
-
#input(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Input] input collection for holding input slices.
-
#output(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected.
-
#upload(object = nil, category: :main, file_name: nil, stream_mode: nil, on_first: nil, columns: nil, slice_batch_size: nil, **args, &block) ⇒ Object
Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first.
- #upload_arel(arel, *column_names, category: :main, &block) ⇒ Object deprecated Deprecated.
- #upload_integer_range(start_id, last_id, category: :main, slice_batch_size: 1_000) ⇒ Object deprecated Deprecated.
- #upload_integer_range_in_reverse_order(start_id, last_id, category: :main, slice_batch_size: 1_000) ⇒ Object deprecated Deprecated.
- #upload_mongo_query(criteria, *column_names, category: :main, &block) ⇒ Object deprecated Deprecated.
-
#upload_slice(slice, category: :main) ⇒ Object
Upload the supplied slice for processing by workers.
Instance Method Details
#download(stream = nil, category: :main, header_line: nil, **args, &block) ⇒ Object
Download the output data into the supplied file, io, IOStreams::Path, or IOStreams::Stream. Returns [Integer] the number of records / lines downloaded.
Parameters
stream [String | IO | IOStreams::Path | IOStreams::Stream]
Full path and file name to stream into the job,
Or, an IO stream that responds to: :write
Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3
Example: Zip
# Since csv is not known to RocketJob it is ignored
job.download('myfile.csv.zip')
Example: Encrypted Zip
job.download('myfile.csv.zip.enc')
Example: Explicitly set the streams
path = IOStreams.path('myfile.ze').stream(:zip).stream(:enc)
job.download(path)
Example: Supply custom options
path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false)
job.download(path)
Example: Supply custom options. Set the file name within the zip file.
path = IOStreams.path('myfile.csv.zip').option(:zip, zip_file_name: 'myfile.csv')
job.download(path)
Example: Download into a tempfile, or stream, using the original file name to determine the streams to apply:
tempfile = Tempfile.new('my_project')
stream = IOStreams.stream(tempfile).file_name('myfile.gz.enc')
job.download(stream)
Example: Add a header and/or trailer record to the downloaded file:
IOStreams.path('/tmp/file.txt.gz').writer do |writer|
writer << "Header\n"
job.download do |line|
writer << line + "\n"
end
writer << "Trailer\n"
end
Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks:
IOStreams.path('/tmp/file.txt.gz').writer(:line) do |writer|
writer << "Header"
job.download do |line|
writer << line
end
writer << "Trailer"
end
Notes:
-
The records are returned in ‘_id’ order. Usually this is the order in which the records were originally loaded.
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 |
# File 'lib/rocket_job/batch/io.rb', line 443 def download(stream = nil, category: :main, header_line: nil, **args, &block) raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing? category = output_category(category) unless category.is_a?(Category::Output) output_collection = output(category) # Store the output file name in the category category.file_name = stream if !block && (stream.is_a?(String) || stream.is_a?(IOStreams::Path)) header_line ||= category.render_header return output_collection.download(header_line: header_line, &block) if block raise(ArgumentError, "Missing mandatory `stream` or `category.file_name`") unless stream || category.file_name if output_collection.slice_class.binary_format binary_header_line = output_collection.slice_class.to_binary(header_line) if header_line # Don't overwrite supplied stream options if any stream = stream&.is_a?(IOStreams::Stream) ? stream.dup : IOStreams.new(category.file_name) stream.remove_from_pipeline(output_collection.slice_class.binary_format) stream.writer(**args) do |io| # TODO: Binary formats should return the record count, instead of the slice count. output_collection.download(header_line: binary_header_line) { |record| io.write(record) } end else # TODO: Add category to named tags to aid problem determination # And RJ Download metric with duration IOStreams.new(stream || category.file_name).writer(:line, **args) do |io| output_collection.download(header_line: header_line) { |record| io << record } end end end |
#input(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Input] input collection for holding input slices
Parameters:
category [Symbol|RocketJob::Category::Input]
The category or the name of the category to access or upload data into
Default: None ( Uses the single default input collection for this job )
Validates: This value must be one of those listed in #input_categories
16 17 18 19 20 |
# File 'lib/rocket_job/batch/io.rb', line 16 def input(category = :main) category = input_category(category) (@inputs ||= {})[category.name] ||= category.data_store(self) end |
#output(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected
Parameters:
category [Symbol|RocketJob::Category::Input]
The category or the name of the category to access or download data from
Default: None ( Uses the single default output collection for this job )
Validates: This value must be one of those listed in #output_categories
30 31 32 33 34 |
# File 'lib/rocket_job/batch/io.rb', line 30 def output(category = :main) category = output_category(category) (@outputs ||= {})[category.name] ||= category.data_store(self) end |
#upload(object = nil, category: :main, file_name: nil, stream_mode: nil, on_first: nil, columns: nil, slice_batch_size: nil, **args, &block) ⇒ Object
Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first
Returns [Integer] the number of slices uploaded.
Uploads one range per slice so that the response can return multiple records for each slice processed. Useful for when the highest order integer values should be processed before the lower integer value ranges. For example when processing every record in a database based on the id column
Example
job.input_category.slice_size = 100
job.upload_integer_range_in_reverse_order(200, 421)
# Equivalent to calling:
job.input.insert([400,421])
job.input.insert([300,399])
job.input.insert([200,299])
Notes:
-
Only call from one thread at a time against a single instance of this job.
-
The record_count for the job is set to: last_id - start_id + 1.
-
If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/rocket_job/batch/io.rb', line 272 def upload(object = nil, category: :main, file_name: nil, stream_mode: nil, on_first: nil, columns: nil, slice_batch_size: nil, **args, &block) input_collection = input(category) if block raise(ArgumentError, "Cannot supply both an object to upload, and a block.") if object if stream_mode || columns || slice_batch_size || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading a block. Only accepts :category, :file_name, or :on_first") end category = input_category(category) category.file_name = file_name if file_name # Extract the header line during the upload when applicable. extract_header = category.extract_header_callback(on_first) count = input_collection.upload(on_first: extract_header, slice_batch_size: slice_batch_size, &block) self.record_count = (record_count || 0) + count return count end count = case object when Range if file_name || stream_mode || on_first || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading a Range. Only accepts :category, :columns, or :slice_batch_size") end first = object.first last = object.last if first < last input_collection.upload_integer_range(first, last, slice_batch_size: slice_batch_size || 1_000) else input_collection.upload_integer_range_in_reverse_order(last, first, slice_batch_size: slice_batch_size || 1_000) end when Mongoid::Criteria if file_name || stream_mode || on_first || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading a Mongoid::Criteria. Only accepts :category, :columns, or :slice_batch_size") end input_collection.upload_mongo_query(object, columns: columns, slice_batch_size: slice_batch_size, &block) when defined?(ActiveRecord::Relation) ? ActiveRecord::Relation : false if file_name || stream_mode || on_first || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading an ActiveRecord::Relation. Only accepts :category, :columns, or :slice_batch_size") end input_collection.upload_arel(object, columns: columns, slice_batch_size: slice_batch_size, &block) else raise(ArgumentError, "Unknown keyword argument :columns when uploading a file") if columns category = input_category(category) # Extract the header line during the upload when applicable. extract_header = category.extract_header_callback(on_first) path = category.upload_path(object, original_file_name: file_name) input_collection.upload(on_first: extract_header, slice_batch_size: slice_batch_size) do |io| path.each(stream_mode || :line, **args) { |line| io << line } end end self.record_count = (record_count || 0) + count count end |
#upload_arel(arel, *column_names, category: :main, &block) ⇒ Object
339 340 341 342 343 |
# File 'lib/rocket_job/batch/io.rb', line 339 def upload_arel(arel, *column_names, category: :main, &block) count = input(category).upload_arel(arel, columns: column_names, &block) self.record_count = (record_count || 0) + count count end |
#upload_integer_range(start_id, last_id, category: :main, slice_batch_size: 1_000) ⇒ Object
353 354 355 356 357 |
# File 'lib/rocket_job/batch/io.rb', line 353 def upload_integer_range(start_id, last_id, category: :main, slice_batch_size: 1_000) count = input(category).upload_integer_range(start_id, last_id, slice_batch_size: slice_batch_size) self.record_count = (record_count || 0) + count count end |
#upload_integer_range_in_reverse_order(start_id, last_id, category: :main, slice_batch_size: 1_000) ⇒ Object
360 361 362 363 364 |
# File 'lib/rocket_job/batch/io.rb', line 360 def upload_integer_range_in_reverse_order(start_id, last_id, category: :main, slice_batch_size: 1_000) count = input(category).upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: slice_batch_size) self.record_count = (record_count || 0) + count count end |
#upload_mongo_query(criteria, *column_names, category: :main, &block) ⇒ Object
346 347 348 349 350 |
# File 'lib/rocket_job/batch/io.rb', line 346 def upload_mongo_query(criteria, *column_names, category: :main, &block) count = input(category).upload_mongo_query(criteria, columns: column_names, &block) self.record_count = (record_count || 0) + count count end |
#upload_slice(slice, category: :main) ⇒ Object
Upload the supplied slice for processing by workers
Updates the record_count after adding the records
Returns [Integer] the number of records uploaded
Parameters
`slice` [ Array<Hash | Array | String | Integer | Float | Symbol | Regexp | Time> ]
All elements in `array` must be serializable to BSON
For example the following types are not supported: Date
Note:
The caller should implement `:slice_size`, since the entire slice is saved as-is.
Note:
Not thread-safe. Only call from one thread at a time
382 383 384 385 386 387 |
# File 'lib/rocket_job/batch/io.rb', line 382 def upload_slice(slice, category: :main) input(category).insert(slice) count = slice.size self.record_count = (record_count || 0) + count count end |