Class: MongoMapperParallel

Inherits:
Object
  • Object
show all
Defined in:
lib/mongo_mapper_parallel.rb

Defined Under Namespace

Classes: Key

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ MongoMapperParallel

Instantiates the parallel operation object with the right class, javascript function, and field

Parameters:

  • opts (Hash) (defaults to: {})

    the options to initialize the parallel script.

Options Hash (opts):

  • :class (Class)

    the Mongo collection’s Ruby Class to execute operations on.

  • :javascript (String)

    the Javascript function in String format

  • :args (Array, Hash)

    the arguments to pass to the Javascript function

  • :split (String, Symbol)

    the field to split the computation on – typically an indexed unique property of the resources in the collection.

  • :maxChunkSizeBytes (Fixnum)

    the size of the chunks to parallelize. Defaults to ‘32*1024*1024 = 33554432`.

  • :debug (Boolean)

    whether to show messages during the process.



112
113
114
115
116
117
118
119
120
121
# File 'lib/mongo_mapper_parallel.rb', line 112

def initialize(opts={})
  @command_class = opts[:class]
  @javascript    = opts[:javascript]
  @args          = opts[:args]
  @split         = opts[:split] # name, title, etc...
  @splitSize     = opts[:maxChunkSizeBytes] || 32*1024*1024
  @debug         = opts[:debug].nil? ? true : opts[:debug]
  get_split_keys()
  self
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



13
14
15
# File 'lib/mongo_mapper_parallel.rb', line 13

def args
  @args
end

#command_classObject

Returns the value of attribute command_class.



11
12
13
# File 'lib/mongo_mapper_parallel.rb', line 11

def command_class
  @command_class
end

#debugObject

Returns the value of attribute debug.



14
15
16
# File 'lib/mongo_mapper_parallel.rb', line 14

def debug
  @debug
end

#javascriptObject

Returns the value of attribute javascript.



12
13
14
# File 'lib/mongo_mapper_parallel.rb', line 12

def javascript
  @javascript
end

#split_keysObject (readonly)

runs distributed computation over a Mongo collection



10
11
12
# File 'lib/mongo_mapper_parallel.rb', line 10

def split_keys
  @split_keys
end

Instance Method Details

#advance(percentage) ⇒ MongoMapperParallel

In case of stalled progress you can skip ahead by a percentage and mark the keys as completed.

Parameters:

  • percentage (Float)

    how far along you want to advance, a value between 0.0 and 1.0

Returns:



138
139
140
141
142
143
144
145
146
# File 'lib/mongo_mapper_parallel.rb', line 138

def advance percentage
  if percentage.class != Float
    raise TypeError.new "Can only advance by a Float value."
  elsif percentage > 1.0 or percentage < 0.0
    raise RangeError.new "Can only advance by a Float between 0.0 and 1.0."
  end
  @split_keys[0..(@split_keys.length*percentage).to_i].each {|i| i.completed = true}
  self
end

#get_extreme_split_keysArray<MongoMapperParallel::Key>

Obtains the splitVectors keys by looking at the first and last element of the database if no splitVector is returned.

Returns:



96
97
98
99
# File 'lib/mongo_mapper_parallel.rb', line 96

def get_extreme_split_keys
  first_split_key = get_first_split_key
  @split_keys << MongoMapperParallel::Key.new(:position => 0, :compiler => self, :key => first_split_key, :future_key => nil, :debug => @debug)
end

#get_first_split_keyObject

Obtains the first key

Returns:

  • (Object)

    the first split key.



77
78
79
# File 'lib/mongo_mapper_parallel.rb', line 77

def get_first_split_key
  @command_class.where().order(@split.to_sym).fields(@split.to_sym).first.send(@split.to_sym)
end

#get_split_keysArray<MongoMapperParallel::Key>

Obtains the splitVectors keys to find chunks to parallelize via the MongoDB splitVector command.

Returns:



84
85
86
87
88
89
90
91
# File 'lib/mongo_mapper_parallel.rb', line 84

def get_split_keys
  @split_keys, splits = [], @command_class.database.command({splitVector: "#{@command_class.database.name}.#{@command_class.collection.name}", keyPattern: {@split.to_sym => 1}, maxChunkSizeBytes: @splitSize })["splitKeys"]
  splits.unshift({@split.to_s => get_first_split_key})
  splits.each_with_index do |split_key,k|
    @split_keys << MongoMapperParallel::Key.new(:position => k, :compiler => self, :key => split_key[@split.to_s], :future_key => (splits[k+1] ? splits[k+1][@split.to_s] : nil),:debug => @debug)
  end
  if @split_keys.length == 0 and @command_class.count > 0 then get_extreme_split_keys end
end

#runObject

Starts the parallel processing using Parallel.



125
126
127
128
129
130
131
132
# File 'lib/mongo_mapper_parallel.rb', line 125

def run
  total = @split_keys.length
  Parallel.each_with_index(@split_keys) do |section,k|
    if !section.completed then section.compile end
    JRProgressBar.show(k,total) if @debug
  end
  puts "Success".green if @debug
end