Class: NoSE::Backend::MongoBackend

Inherits:
Backend show all
Includes:
Subtype
Defined in:
lib/nose/backend/mongo.rb

Overview

A backend which communicates with MongoDB

Defined Under Namespace

Classes: IndexLookupStatementStep, InsertStatementStep

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Subtype

included

Methods inherited from Backend

#drop_index, #index_empty?, #index_exists?, #indexes_sample, #prepare, #prepare_query, #prepare_update, #query, #update

Methods included from Supertype

included

Methods included from Listing

included

Constructor Details

#initialize(model, indexes, plans, update_plans, config) ⇒ MongoBackend

Returns a new instance of MongoBackend.



11
12
13
14
15
16
17
# File 'lib/nose/backend/mongo.rb', line 11

def initialize(model, indexes, plans, update_plans, config)
  super

  @uri = config[:uri]
  @database = config[:database]
  Mongo::Logger.logger.level = ::Logger::FATAL
end

Class Method Details

.field_path(index, field) ⇒ Array<String>

Find the path to a given field

Returns:



126
127
128
129
130
131
132
133
134
135
136
# File 'lib/nose/backend/mongo.rb', line 126

def self.field_path(index, field)
  # Find the path from the hash entity to the given key
  field_path = index.graph.path_between index.hash_fields.first.parent,
                                        field.parent
  field_path = field_path.path_for_field(field)

  # Use _id for any primary keys
  field_path[-1] = '_id' if field.is_a? Fields::IDField

  field_path
end

.rows_from_mongo(rows, index, fields = nil) ⇒ Array<Hash>

Convert documens returned from MongoDB into the format we understand

Returns:



113
114
115
116
117
118
119
120
121
122
# File 'lib/nose/backend/mongo.rb', line 113

def self.rows_from_mongo(rows, index, fields = nil)
  fields = index.all_fields if fields.nil?

  rows.map! do |row|
    Hash[fields.map do |field|
      field_path = MongoBackend.field_path(index, field)
      [field.id, field_path.reduce(row) { |h, p| h[p] }]
    end]
  end
end

Instance Method Details

#by_id_graphBoolean

MongoDB uses ID graphs for column families

Returns:

  • (Boolean)


21
22
23
# File 'lib/nose/backend/mongo.rb', line 21

def by_id_graph
  true
end

#generate_idBSON::ObjectId

Produce a new ObjectId

Returns:

  • (BSON::ObjectId)


27
28
29
# File 'lib/nose/backend/mongo.rb', line 27

def generate_id
  BSON::ObjectId.new
end

#index_insert_chunk(index, chunk) ⇒ Array<BSON::ObjectId>

Insert a chunk of rows into an index

Returns:

  • (Array<BSON::ObjectId>)


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/nose/backend/mongo.rb', line 76

def index_insert_chunk(index, chunk)
  # We only need to insert into indexes which are ID graphs
  fail unless index == index.to_id_graph

  chunk.map! do |row|
    row_hash = Hash.new { |h, k| h[k] = Hash.new(&h.default_proc) }
    index.all_fields.each do |field|
      field_path = self.class.field_path(index, field)
      entity_hash = field_path[0..-2].reduce(row_hash) { |h, k| h[k] }

      if field_path.last == '_id'
        entity_hash[field_path.last] = BSON::ObjectId.new
      else
        entity_hash[field_path.last] = row[field.id]
      end
    end

    row_hash.default_proc = nil
    row_hash
  end

  client[index.key].insert_many(chunk, ordered: false).inserted_ids
end

#index_sample(index, count) ⇒ Object

Sample a number of values from the given index



101
102
103
104
105
106
107
108
109
# File 'lib/nose/backend/mongo.rb', line 101

def index_sample(index, count)
  rows = client[index.to_id_graph.key].aggregate(
    [
      { '$sample' => { 'size' => count } }
    ]
  ).to_a

  MongoBackend.rows_from_mongo rows, index
end

#indexes_ddl(execute = false, skip_existing = false, drop_existing = false) ⇒ Object

Create new MongoDB collections for each index



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/nose/backend/mongo.rb', line 32

def indexes_ddl(execute = false, skip_existing = false,
                drop_existing = false)
  ddl = []

  # Create the ID graphs for all indexes
  id_graphs = @indexes.map(&:to_id_graph).uniq
  id_graphs.map do |id_graph|
    ddl << "Create #{id_graph.key}"
    next unless execute

    collection = client.collections.find { |c| c.name == id_graph.key }
    collection.drop if drop_existing && !collection.nil?
    client[id_graph.key].create unless skip_existing
  end

  # Create any necessary indexes on the ID graphs
  index_keys = []
  @indexes.sort_by do |index|
    -(index.hash_fields.to_a + index.order_fields).length
  end.each do |index|
    # Check if we already have a prefix of this index created
    keys = index.hash_fields.to_a + index.order_fields
    next if index_keys.any? { |i| i[keys.length - 1] == keys }
    index_keys << keys

    id_graph = index.to_id_graph
    next if id_graph == index

    # Combine the key paths for all fields to create a compound index
    index_spec = Hash[keys.map do |key|
      [self.class.field_path(index, key).join('.'), 1]
    end]

    ddl << "Add index #{index_spec} to #{id_graph.key} (#{index.key})"
    next unless execute

    client[id_graph.key].indexes.create_one index_spec
  end

  ddl
end