Class: Adhd::NodeManager

Inherits:
Object show all
Defined in:
lib/adhd/node_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ NodeManager

Returns a new instance of NodeManager.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/adhd/node_manager.rb', line 15

def initialize(config)
  @config = config
  @couch_server = CouchRest.new("http://#{config.node_url}:#{config.couchdb_server_port}")
  # @couch_server.default_database = "#{config.node_name}_node_db"
  @couch_db = @couch_server.database!("#{config.node_name}_node_db") # CouchRest::Database.new(@couch_server, "#{config.node_name}_node_db")
  sync_with_buddy_node if config.buddy_server_url && config.buddy_server_node_name
  @our_node = initialize_node
  build_node_admin_databases
  set_as_management_node_if_necessary
  build_shards(10, 2)
  build_node_content_databases
  sync_databases
end

Instance Attribute Details

#ndbObject

Returns the value of attribute ndb.



13
14
15
# File 'lib/adhd/node_manager.rb', line 13

def ndb
  @ndb
end

#our_nodeObject

Returns the value of attribute our_node.



13
14
15
# File 'lib/adhd/node_manager.rb', line 13

def our_node
  @our_node
end

#srdbObject

Returns the value of attribute srdb.



13
14
15
# File 'lib/adhd/node_manager.rb', line 13

def srdb
  @srdb
end

Instance Method Details

#build_node_admin_databasesObject



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/adhd/node_manager.rb', line 79

def build_node_admin_databases
  @conn_manager = ConnectionBank.new

  # Let's build a nice NodeDB
  @ndb = NodeDB.new(@our_node)
  conn_node = UpdateNotifierConnection.new(@config.node_url,
                                    @config.couchdb_server_port,
                                    @our_node.name + "_node_db", # NOTE: Sooo ugly!
                                    Proc.new {|data| handle_node_update data})
  @conn_manager.add_connection(conn_node)


  # Lets build a nice ShardDB
  @srdb = ShardRangeDB.new(@ndb)

  # Listen to the shard db and in case something changes re-build the DB
  # Chenges to the shards should be in-frequent and tolerable
  conn_shard = UpdateNotifierConnection.new(@config.node_url,
                                    @config.couchdb_server_port,
                                    @our_node.name + "_shard_db", # NOTE: Sooo ugly!
                                    Proc.new {|data| build_node_content_databases})
  @conn_manager.add_connection(conn_shard)

end

#build_node_content_databasesObject

Build content shard databases.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/adhd/node_manager.rb', line 127

def build_node_content_databases
  # NOTE: we will have to refresh those then we are re-assigned shards
  @contentdbs = {} if !@contentdbs
  
  # This is a fresh version of the DB, and we should use it to 
  # add, remove and *update* shards
  current_shards = @srdb.get_content_shards

  # Add the new shards
  current_shards.each_key do |cs|
    if !(@contentdbs.has_key?(cs)) # Make sure we do not know of this shard
      shard_db = current_shards[cs]
      conn = UpdateNotifierConnection.new(@config.node_url,
                                      @config.couchdb_server_port,
                                      @our_node.name + "_" + shard_db.this_shard.shard_db_name + "_content_db", # NOTE: Sooo ugly!
                                      Proc.new { |data|
                                        puts "SYNC #{shard_db.this_shard.shard_db_name} #{data}" 
                                        
                                        # Lazy evaluation to make sure we
                                        # get the latest shard information
                                        @contentdbs[cs][0].sync 
                                        })
      @conn_manager.add_connection(conn)

      # Store both the shard object and the update notifier
      @contentdbs[cs] = [shard_db, conn]
    else
      # Keep the same connection, but update the shard information
      @contentdbs[cs] = [current_shards[cs], @contentdbs[cs][1]]
    end
  end

  # Delete what we do not need
  @contentdbs.each_key do |cs|
    if !(current_shards.has_key?(cs))
      # Delete this shard from our DB
      remove_content_shard @contentdbs[cs][0], @contentdbs[cs][1]
      # Remove that key
      @contentdbs.delete cs
    end
  end
end

#build_shards(number_of_shards, number_of_replicators) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/adhd/node_manager.rb', line 188

def build_shards(number_of_shards, number_of_replicators)
  if @our_node.is_management

    if ShardRange.by_range_start.length == 0
      puts "Creating new ranges"
      @srdb.build_shards(number_of_shards)
    end

    # Populate the shards with some nodes at random
    node_names = []
    all_nodes = Node.by_name
    all_nodes.each do |anode|
      node_names << anode.name
    end

    ShardRange.by_range_start.each do |s|
      if !s.node_list or s.node_list.length == 0
        node_names.shuffle!
        s.node_list = node_names[0..(number_of_replicators-1)]
        s.master_node = node_names[0]
        s.save
      end
    end
  end
end

#event_handler(ev) ⇒ Object



29
30
31
# File 'lib/adhd/node_manager.rb', line 29

def event_handler(ev)
  puts ev
end

#handle_node_update(update) ⇒ Object

A node changed status (became available, or became unavailable).

If we are the admin, when a node joins we should allocate some shards to it.

Only the head management node deals with node changes.

XXXX doc question: what does it mean to be 'the admin'?


113
114
115
116
117
118
119
120
121
122
# File 'lib/adhd/node_manager.rb', line 113

def handle_node_update update
  return if @ndb.head_management_node && ! (@ndb.head_management_node.name == @our_node.name)

  # Given the shard_db and the node_db we should work out a new allocation
  node_list = Node.by_name
  shard_list = ShardRange.by_range_start
  if node_list && shard_list
    assign_nodes_to_shards(node_list, shard_list, 2)
  end
end

#initialize_nodeObject

Retrieve our own node record from CouchDB by our name.

If there are other nodes with the name kill their records!



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/adhd/node_manager.rb', line 49

def initialize_node
  puts "Initialize node #{@config.node_name}"
  Node.use_database @couch_db
  puts "DB #{@couch_db}, node #{@config.node_name}"
  node_candidates = Node.by_name(:key => @config.node_name)
  # node_candidates = @couch_db.view("by_name", {:key => @config.node_name})
  node = node_candidates.pop
  node = Node.new if node.nil?
  node_candidates.each do |other_me|
    other_me.destroy # destroy other records
  end
  # Update our very own record
  node.name = @config.node_name
  node.url = "http://#{@config.node_url}:#{@config.couchdb_server_port}"
  node.status = "RUNNING"
  node.save
  node # Save our node as instance variable
end

#remove_content_shard(content_shard, connection) ⇒ Object

Kills the connection listening for updates on this shard

TODO: test if the sync happened content_shard.this_shard_db.delete! TODO: run a sync with the current master to ensure that

any changes have been pushed. The DELETE the database
to save space


177
178
179
180
# File 'lib/adhd/node_manager.rb', line 177

def remove_content_shard content_shard, connection
  connection.kill
  content_shard.sync
end

#runObject

Enters the eventmachine loop



184
185
186
# File 'lib/adhd/node_manager.rb', line 184

def run
  @conn_manager.run_all
end

#set_as_management_node_if_necessaryObject

We check if we are the first node. If we are the first node, we set ourself up as the management node.



71
72
73
74
75
76
77
# File 'lib/adhd/node_manager.rb', line 71

def set_as_management_node_if_necessary
  all_nodes = Node.by_name
  if all_nodes.length == 1
    @our_node.is_management = 300
    @our_node.save
  end
end

#sync_adminObject



223
224
225
226
# File 'lib/adhd/node_manager.rb', line 223

def sync_admin
    @ndb.sync # SYNC
    @srdb.sync # SYNC
end

#sync_databasesObject



214
215
216
217
218
219
220
221
# File 'lib/adhd/node_manager.rb', line 214

def sync_databases
  @ndb.sync # SYNC
  @srdb.sync # SYNC

  @contentdbs.each_key do |cs|
    @contentdbs[cs][0].sync
  end
end

#sync_with_buddy_nodeObject

Sync the db with our buddy



35
36
37
38
39
40
41
42
43
# File 'lib/adhd/node_manager.rb', line 35

def sync_with_buddy_node
  begin
    buddy_server = CouchRest.new("#{@config.buddy_server_url}")
    buddy_db = buddy_server.database!(@config.buddy_server_node_name + "_node_db")
    @couch_db.replicate_from(buddy_db)
  rescue
    puts "Could not buddy up with node #{@config.buddy_server_node_name}"
  end
end