Class: RocketJob::Sliced::Slices

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable, SemanticLogger::Loggable
Defined in:
lib/rocket_job/sliced/slices.rb

Direct Known Subclasses

Input, Output

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) ⇒ Slices

Parameters

name: [String]
  Name of the collection to create
slice_size: [Integer]
  Number of records to store in each slice
  Default: 100
slice_class: [class]
  Slice class to use to hold records.
  Default: RocketJob::Sliced::Slice


20
21
22
23
24
25
26
27
# File 'lib/rocket_job/sliced/slices.rb', line 20

def initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100)
  @slice_class     = slice_class
  @slice_size      = slice_size
  @collection_name = collection_name

  # Using `Sliced::Slice` avoids having to add `_type` as an index when all slices are the same type anyway.
  @all = Sliced::Slice.with_collection(collection_name)
end

Instance Attribute Details

#allObject (readonly)

Returns the value of attribute all.



9
10
11
# File 'lib/rocket_job/sliced/slices.rb', line 9

def all
  @all
end

#collection_nameObject

Returns the value of attribute collection_name.



8
9
10
# File 'lib/rocket_job/sliced/slices.rb', line 8

def collection_name
  @collection_name
end

#slice_classObject

Returns the value of attribute slice_class.



8
9
10
# File 'lib/rocket_job/sliced/slices.rb', line 8

def slice_class
  @slice_class
end

#slice_sizeObject

Returns the value of attribute slice_size.



8
9
10
# File 'lib/rocket_job/sliced/slices.rb', line 8

def slice_size
  @slice_size
end

Instance Method Details

#append(slice, input_slice) ⇒ Object

Append to an existing slice if already present



99
100
101
102
103
104
105
106
107
# File 'lib/rocket_job/sliced/slices.rb', line 99

def append(slice, input_slice)
  existing_slice = all.where(id: input_slice.id).first
  return insert(slice, input_slice) unless existing_slice

  extra_records          = slice.is_a?(Slice) ? slice.records : slice
  existing_slice.records = existing_slice.records + extra_records
  existing_slice.save!
  existing_slice
end

#completedObject

Forwardable generates invalid warnings on these methods.



131
132
133
# File 'lib/rocket_job/sliced/slices.rb', line 131

def completed
  all.completed
end

#create(params = {}) ⇒ Object



33
34
35
36
37
# File 'lib/rocket_job/sliced/slices.rb', line 33

def create(params = {})
  slice = new(params)
  slice.save
  slice
end

#create!(params = {}) ⇒ Object



39
40
41
42
43
# File 'lib/rocket_job/sliced/slices.rb', line 39

def create!(params = {})
  slice = new(params)
  slice.save!
  slice
end

#create_indexesObject

Index for find_and_modify only if it is not already present



112
113
114
115
116
117
118
119
120
# File 'lib/rocket_job/sliced/slices.rb', line 112

def create_indexes
  missing =
    begin
      all.collection.indexes.none? { |i| i["name"] == "state_1__id_1" }
    rescue Mongo::Error::OperationFailure
      true
    end
  all.collection.indexes.create_one({state: 1, _id: 1}, unique: true) if missing
end

#dropObject

Drop this collection when it is no longer needed



126
127
128
# File 'lib/rocket_job/sliced/slices.rb', line 126

def drop
  all.collection.drop
end

#each(&block) ⇒ Object

Returns output slices in the order of their id which is usually the order in which they were written.



47
48
49
# File 'lib/rocket_job/sliced/slices.rb', line 47

def each(&block)
  all.sort(id: 1).each(&block)
end

#failedObject



135
136
137
# File 'lib/rocket_job/sliced/slices.rb', line 135

def failed
  all.failed
end

#firstObject

Mongoid does not apply ordering, add sort rubocop:disable Style/RedundantSort



149
150
151
# File 'lib/rocket_job/sliced/slices.rb', line 149

def first
  all.sort("_id" => 1).first
end

#group_exceptionsObject

Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.

Each struct consists of:

class_name: [String]
  Exception class name.

count: [Integer]
  Number of exceptions with this class.

messages: [Array<String>]
  Unique list of error messages.


171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/rocket_job/sliced/slices.rb', line 171

def group_exceptions
  result_struct = Struct.new(:class_name, :count, :messages)
  result        = all.collection.aggregate(
    [
      {
        "$match" => {state: "failed"}
      },
      {
        "$group" => {
          _id:      {error_class: "$exception.class_name"},
          messages: {"$addToSet" => "$exception.message"},
          count:    {"$sum" => 1}
        }
      }
    ]
  )
  result.collect do |errors|
    result_struct.new(errors["_id"]["error_class"], errors["count"], errors["messages"])
  end
end

#insert(slice, input_slice = nil) ⇒ Object Also known as: <<

Insert a new slice into the collection

Returns [Integer] the number of records uploaded

Parameters

slice [RocketJob::Sliced::Slice | Array]
  The slice to write to the slices collection
  If slice is an Array, it will be converted to a Slice before inserting
  into the slices collection

input_slice [RocketJob::Sliced::Slice]
  The input slice to which this slice corresponds
  The id of the input slice is copied across
  If the insert results in a duplicate record it is ignored, to support
  restarting of jobs that failed in the middle of processing.
  A warning is logged that the slice has already been processed.

Note:

`slice_size` is not enforced.
However many records are present in the slice will be written as a
single slice to the slices collection


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rocket_job/sliced/slices.rb', line 73

def insert(slice, input_slice = nil)
  slice = new(records: slice) unless slice.is_a?(Slice)

  # Retain input_slice id in the new output slice
  if input_slice
    slice.id                  = input_slice.id
    slice.first_record_number = input_slice.first_record_number
  end

  begin
    slice.save!
  rescue Mongo::Error::OperationFailure => e
    # Ignore duplicates since it means the job was restarted
    raise(e) unless e.message.include?("E11000")

    logger.warn "Skipped already processed slice# #{slice.id}"
  end
  slice
end

#insert_many(slices) ⇒ Object



93
94
95
96
# File 'lib/rocket_job/sliced/slices.rb', line 93

def insert_many(slices)
  documents = slices.collect(&:as_document)
  all.collection.insert_many(documents) if documents.present?
end

#lastObject



153
154
155
# File 'lib/rocket_job/sliced/slices.rb', line 153

def last
  all.sort("_id" => -1).first
end

#new(params = {}) ⇒ Object



29
30
31
# File 'lib/rocket_job/sliced/slices.rb', line 29

def new(params = {})
  slice_class.new(params.merge(collection_name: collection_name))
end

#queuedObject



139
140
141
# File 'lib/rocket_job/sliced/slices.rb', line 139

def queued
  all.queued
end

#runningObject



143
144
145
# File 'lib/rocket_job/sliced/slices.rb', line 143

def running
  all.running
end