Class: RocketJob::Sliced::Input
- Defined in:
- lib/rocket_job/sliced/input.rb
Instance Attribute Summary
Attributes inherited from Slices
#all, #collection_name, #slice_class, #slice_size
Instance Method Summary collapse
-
#each_failed_record ⇒ Object
Iterate over each failed record, if any Since each slice can only contain 1 failed record, only the failed record is returned along with the slice containing the exception details.
-
#next_slice(worker_name) ⇒ Object
Returns the next slice to work on in id order Returns nil if there are currently no queued slices.
-
#requeue_failed ⇒ Object
Requeue all failed slices.
-
#requeue_running(worker_name) ⇒ Object
Requeue all running slices for a server or worker that is no longer available.
- #upload(**args, &block) ⇒ Object
- #upload_arel(arel, columns: nil, slice_batch_size: nil, &block) ⇒ Object
- #upload_integer_range(start_id, last_id, slice_batch_size: 1_000) ⇒ Object
- #upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: 1_000) ⇒ Object
- #upload_mongo_query(criteria, columns: [], slice_batch_size: nil, &block) ⇒ Object
Methods inherited from Slices
#append, #completed, #create, #create!, #create_indexes, #drop, #each, #failed, #first, #group_exceptions, #initialize, #insert, #insert_many, #last, #new, #queued, #running
Constructor Details
This class inherits a constructor from RocketJob::Sliced::Slices
Instance Method Details
#each_failed_record ⇒ Object
Iterate over each failed record, if any Since each slice can only contain 1 failed record, only the failed record is returned along with the slice containing the exception details
Example:
job.each_failed_record do |record, slice|
ap slice
end
97 98 99 100 101 102 |
# File 'lib/rocket_job/sliced/input.rb', line 97 def each_failed_record failed.each do |slice| record = slice.failed_record yield(record, slice) unless record.nil? end end |
#next_slice(worker_name) ⇒ Object
Returns the next slice to work on in id order Returns nil if there are currently no queued slices
If a slice is in queued state it will be started and assigned to this worker
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rocket_job/sliced/input.rb', line 124 def next_slice(worker_name) # TODO: Will it perform faster without the id sort? # I.e. Just process on a FIFO basis? document = all.queued. sort("_id" => 1). find_one_and_update( {"$set" => {worker_name: worker_name, state: "running", started_at: Time.now}}, return_document: :after ) document.collection_name = collection_name if document document end |
#requeue_failed ⇒ Object
Requeue all failed slices
105 106 107 108 109 110 |
# File 'lib/rocket_job/sliced/input.rb', line 105 def requeue_failed failed.update_all( "$unset" => {worker_name: nil, started_at: nil}, "$set" => {state: "queued"} ) end |
#requeue_running(worker_name) ⇒ Object
Requeue all running slices for a server or worker that is no longer available
113 114 115 116 117 118 |
# File 'lib/rocket_job/sliced/input.rb', line 113 def requeue_running(worker_name) running.where(worker_name: /\A#{worker_name}/).update_all( "$unset" => {worker_name: nil, started_at: nil}, "$set" => {state: "queued"} ) end |
#upload(**args, &block) ⇒ Object
4 5 6 7 8 9 10 11 |
# File 'lib/rocket_job/sliced/input.rb', line 4 def upload(**args, &block) # Create indexes before uploading create_indexes Writer::Input.collect(self, **args, &block) rescue Exception => e drop raise(e) end |
#upload_arel(arel, columns: nil, slice_batch_size: nil, &block) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/rocket_job/sliced/input.rb', line 43 def upload_arel(arel, columns: nil, slice_batch_size: nil, &block) unless block columns = columns.blank? ? [:id] : columns.collect(&:to_sym) block = if columns.size == 1 column = columns.first ->(model) { model.public_send(column) } else ->(model) { columns.collect { |c| model.public_send(c) } } end # find_each requires the :id column in the query selection = columns.include?(:id) ? columns : columns + [:id] arel = arel.select(selection) end upload(slice_batch_size: slice_batch_size) { |records| arel.find_each { |model| records << block.call(model) } } end |
#upload_integer_range(start_id, last_id, slice_batch_size: 1_000) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/rocket_job/sliced/input.rb', line 62 def upload_integer_range(start_id, last_id, slice_batch_size: 1_000) # Each "record" is actually a range of Integers which makes up each slice upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records| while start_id <= last_id end_id = start_id + slice_size - 1 end_id = last_id if end_id > last_id records << [start_id, end_id] start_id += slice_size end end end |
#upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: 1_000) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/rocket_job/sliced/input.rb', line 74 def upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: 1_000) # Each "record" is actually a range of Integers which makes up each slice upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records| end_id = last_id while end_id >= start_id first_id = end_id - slice_size + 1 first_id = start_id if first_id.negative? || (first_id < start_id) records << [first_id, end_id] end_id -= slice_size end end end |
#upload_mongo_query(criteria, columns: [], slice_batch_size: nil, &block) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rocket_job/sliced/input.rb', line 13 def upload_mongo_query(criteria, columns: [], slice_batch_size: nil, &block) = criteria. # Without a block extract the fields from the supplied criteria if block # Criteria is returning old school :fields instead of :projections [:projection] = .delete(:fields) if .key?(:fields) else columns = columns.blank? ? ["_id"] : columns.collect(&:to_s) fields = .delete(:fields) || {} columns.each { |col| fields[col] = 1 } [:projection] = fields block = if columns.size == 1 column = columns.first ->(document) { document[column] } else ->(document) { columns.collect { |c| document[c] } } end end upload(slice_batch_size: slice_batch_size) do |records| # Drop down to the mongo driver level to avoid constructing a Model for each document returned criteria.klass.collection.find(criteria.selector, ).each do |document| records << block.call(document) end end end |