Class: RocketJob::Sliced::Slices
- Inherits:
-
Object
- Object
- RocketJob::Sliced::Slices
- Extended by:
- Forwardable
- Includes:
- Enumerable, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/sliced/slices.rb
Instance Attribute Summary collapse
-
#all ⇒ Object
readonly
Returns the value of attribute all.
-
#collection_name ⇒ Object
Returns the value of attribute collection_name.
-
#slice_class ⇒ Object
Returns the value of attribute slice_class.
-
#slice_size ⇒ Object
Returns the value of attribute slice_size.
Instance Method Summary collapse
-
#append(slice, input_slice) ⇒ Object
Append to an existing slice if already present.
-
#completed ⇒ Object
Forwardable generates invalid warnings on these methods.
- #create(params = {}) ⇒ Object
- #create!(params = {}) ⇒ Object
-
#create_indexes ⇒ Object
Index for find_and_modify only if it is not already present.
-
#drop ⇒ Object
Drop this collection when it is no longer needed.
-
#each(&block) ⇒ Object
Returns output slices in the order of their id which is usually the order in which they were written.
- #failed ⇒ Object
-
#first ⇒ Object
Mongoid does not apply ordering, add sort rubocop:disable Style/RedundantSort.
-
#group_exceptions ⇒ Object
Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.
-
#initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) ⇒ Slices
constructor
Parameters name: [String] Name of the collection to create slice_size: [Integer] Number of records to store in each slice Default: 100 slice_class: [class] Slice class to use to hold records.
-
#insert(slice, input_slice = nil) ⇒ Object
(also: #<<)
Insert a new slice into the collection.
- #insert_many(slices) ⇒ Object
- #last ⇒ Object
- #new(params = {}) ⇒ Object
- #queued ⇒ Object
- #running ⇒ Object
Constructor Details
#initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) ⇒ Slices
Parameters
name: [String]
Name of the collection to create
slice_size: [Integer]
Number of records to store in each slice
Default: 100
slice_class: [class]
Slice class to use to hold records.
Default: RocketJob::Sliced::Slice
20 21 22 23 24 25 26 27 |
# File 'lib/rocket_job/sliced/slices.rb', line 20 def initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) @slice_class = slice_class @slice_size = slice_size @collection_name = collection_name # Using `Sliced::Slice` avoids having to add `_type` as an index when all slices are the same type anyway. @all = Sliced::Slice.with_collection(collection_name) end |
Instance Attribute Details
#all ⇒ Object (readonly)
Returns the value of attribute all.
9 10 11 |
# File 'lib/rocket_job/sliced/slices.rb', line 9 def all @all end |
#collection_name ⇒ Object
Returns the value of attribute collection_name.
8 9 10 |
# File 'lib/rocket_job/sliced/slices.rb', line 8 def collection_name @collection_name end |
#slice_class ⇒ Object
Returns the value of attribute slice_class.
8 9 10 |
# File 'lib/rocket_job/sliced/slices.rb', line 8 def slice_class @slice_class end |
#slice_size ⇒ Object
Returns the value of attribute slice_size.
8 9 10 |
# File 'lib/rocket_job/sliced/slices.rb', line 8 def slice_size @slice_size end |
Instance Method Details
#append(slice, input_slice) ⇒ Object
Append to an existing slice if already present
99 100 101 102 103 104 105 106 107 |
# File 'lib/rocket_job/sliced/slices.rb', line 99 def append(slice, input_slice) existing_slice = all.where(id: input_slice.id).first return insert(slice, input_slice) unless existing_slice extra_records = slice.is_a?(Slice) ? slice.records : slice existing_slice.records = existing_slice.records + extra_records existing_slice.save! existing_slice end |
#completed ⇒ Object
Forwardable generates invalid warnings on these methods.
131 132 133 |
# File 'lib/rocket_job/sliced/slices.rb', line 131 def completed all.completed end |
#create(params = {}) ⇒ Object
33 34 35 36 37 |
# File 'lib/rocket_job/sliced/slices.rb', line 33 def create(params = {}) slice = new(params) slice.save slice end |
#create!(params = {}) ⇒ Object
39 40 41 42 43 |
# File 'lib/rocket_job/sliced/slices.rb', line 39 def create!(params = {}) slice = new(params) slice.save! slice end |
#create_indexes ⇒ Object
Index for find_and_modify only if it is not already present
112 113 114 115 116 117 118 119 120 |
# File 'lib/rocket_job/sliced/slices.rb', line 112 def create_indexes missing = begin all.collection.indexes.none? { |i| i["name"] == "state_1__id_1" } rescue Mongo::Error::OperationFailure true end all.collection.indexes.create_one({state: 1, _id: 1}, unique: true) if missing end |
#drop ⇒ Object
Drop this collection when it is no longer needed
126 127 128 |
# File 'lib/rocket_job/sliced/slices.rb', line 126 def drop all.collection.drop end |
#each(&block) ⇒ Object
Returns output slices in the order of their id which is usually the order in which they were written.
47 48 49 |
# File 'lib/rocket_job/sliced/slices.rb', line 47 def each(&block) all.sort(id: 1).each(&block) end |
#failed ⇒ Object
135 136 137 |
# File 'lib/rocket_job/sliced/slices.rb', line 135 def failed all.failed end |
#first ⇒ Object
Mongoid does not apply ordering, add sort rubocop:disable Style/RedundantSort
149 150 151 |
# File 'lib/rocket_job/sliced/slices.rb', line 149 def first all.sort("_id" => 1).first end |
#group_exceptions ⇒ Object
Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.
Each struct consists of:
class_name: [String]
Exception class name.
count: [Integer]
Number of exceptions with this class.
messages: [Array<String>]
Unique list of error messages.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/rocket_job/sliced/slices.rb', line 171 def group_exceptions result_struct = Struct.new(:class_name, :count, :messages) result = all.collection.aggregate( [ { "$match" => {state: "failed"} }, { "$group" => { _id: {error_class: "$exception.class_name"}, messages: {"$addToSet" => "$exception.message"}, count: {"$sum" => 1} } } ] ) result.collect do |errors| result_struct.new(errors["_id"]["error_class"], errors["count"], errors["messages"]) end end |
#insert(slice, input_slice = nil) ⇒ Object Also known as: <<
Insert a new slice into the collection
Returns [Integer] the number of records uploaded
Parameters
slice [RocketJob::Sliced::Slice | Array]
The slice to write to the slices collection
If slice is an Array, it will be converted to a Slice before inserting
into the slices collection
input_slice [RocketJob::Sliced::Slice]
The input slice to which this slice corresponds
The id of the input slice is copied across
If the insert results in a duplicate record it is ignored, to support
restarting of jobs that failed in the middle of processing.
A warning is logged that the slice has already been processed.
Note:
`slice_size` is not enforced.
However many records are present in the slice will be written as a
single slice to the slices collection
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/rocket_job/sliced/slices.rb', line 73 def insert(slice, input_slice = nil) slice = new(records: slice) unless slice.is_a?(Slice) # Retain input_slice id in the new output slice if input_slice slice.id = input_slice.id slice.first_record_number = input_slice.first_record_number end begin slice.save! rescue Mongo::Error::OperationFailure => e # Ignore duplicates since it means the job was restarted raise(e) unless e..include?("E11000") logger.warn "Skipped already processed slice# #{slice.id}" end slice end |
#insert_many(slices) ⇒ Object
93 94 95 96 |
# File 'lib/rocket_job/sliced/slices.rb', line 93 def insert_many(slices) documents = slices.collect(&:as_document) all.collection.insert_many(documents) if documents.present? end |
#last ⇒ Object
153 154 155 |
# File 'lib/rocket_job/sliced/slices.rb', line 153 def last all.sort("_id" => -1).first end |
#new(params = {}) ⇒ Object
29 30 31 |
# File 'lib/rocket_job/sliced/slices.rb', line 29 def new(params = {}) slice_class.new(params.merge(collection_name: collection_name)) end |
#queued ⇒ Object
139 140 141 |
# File 'lib/rocket_job/sliced/slices.rb', line 139 def queued all.queued end |
#running ⇒ Object
143 144 145 |
# File 'lib/rocket_job/sliced/slices.rb', line 143 def running all.running end |