Class: Adhd::NodeManager

Inherits:
Object show all
Defined in:
lib/adhd/node_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ NodeManager

Returns a new instance of NodeManager.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/adhd/node_manager.rb', line 15

def initialize(config)
  @config = config
  @couch_server = CouchRest.new("http://#{config.node_url}:#{config.couchdb_server_port}")
  # @couch_server.default_database = "#{config.node_name}_node_db"
  @couch_db = @couch_server.database!("#{config.node_name}_node_db") # CouchRest::Database.new(@couch_server, "#{config.node_name}_node_db")
  sync_with_buddy_node if config.buddy_server_url && config.buddy_server_db_name
  @our_node = initialize_node
  build_node_admin_databases
  set_as_management_node_if_necessary
  build_shards(10, 2)
  build_node_content_databases
  sync_databases
end

Instance Attribute Details

#ndbObject

Returns the value of attribute ndb.



13
14
15
# File 'lib/adhd/node_manager.rb', line 13

def ndb
  @ndb
end

#our_nodeObject

Returns the value of attribute our_node.



13
14
15
# File 'lib/adhd/node_manager.rb', line 13

def our_node
  @our_node
end

#srdbObject

Returns the value of attribute srdb.



13
14
15
# File 'lib/adhd/node_manager.rb', line 13

def srdb
  @srdb
end

Instance Method Details

#build_node_admin_databasesObject



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/adhd/node_manager.rb', line 79

def build_node_admin_databases
  @conn_manager = ConnectionBank.new

  # Lets build a nice NodeDB
  @ndb = NodeDB.new(@our_node)
  conn_node = UpdateNotifierConnection.new(@config.node_url,
                                    @config.couchdb_server_port,
                                    @our_node.name + "_node_db", # NOTE: Sooo ugly!
                                    Proc.new {|data| handle_node_update data})
  @conn_manager.add_connection(conn_node)


  # Lets build a nice ShardDB
  @srdb = ShardRangeDB.new(@ndb)

  # Listen to the shard db and in case something changes re-build the DB
  # Chenges to the shards should be in-frequent and tolerable
  conn_shard = UpdateNotifierConnection.new(@config.node_url,
                                    @config.couchdb_server_port,
                                    @our_node.name + "_shard_db", # NOTE: Sooo ugly!
                                    Proc.new {|data| build_node_content_databases})
  @conn_manager.add_connection(conn_shard)

end

#build_node_content_databasesObject



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/adhd/node_manager.rb', line 121

def build_node_content_databases
  # Get all content shard databases
  # NOTE: we will have to refresh those then we are re-assigned shards
  @contentdbs = {} if !@contentdbs
  current_shards = @srdb.get_content_shards

  # Add the new shards
  current_shards.each_key do |cs|
    if !(@contentdbs.has_key?(cs)) # Make sure we do not know of this shard
      shard_db = current_shards[cs]
      conn = UpdateNotifierConnection.new(@config.node_url,
                                      @config.couchdb_server_port,
                                      @our_node.name + "_" + shard_db.this_shard.shard_db_name + "_content_db", # NOTE: Sooo ugly!
                                      Proc.new { |data| shard_db.sync })
      @conn_manager.add_connection(conn)

      # Store both the shard object and the update notifier
      @contentdbs[cs] = [shard_db, conn]
    end
  end

  # Delete what we do not need
  @contentdbs.each_key do |cs|
    if !(current_shards.has_key?(cs))
      # Delete this shard from our DB
      remove_content_shard @contentdbs[cs][0], @contentdbs[cs][1]
      # Remove that key
      @contentdbs.delete cs
    end
  end
end

#build_shards(number_of_shards, number_of_replicators) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/adhd/node_manager.rb', line 169

def build_shards(number_of_shards, number_of_replicators)
  if @our_node.is_management

    if ShardRange.by_range_start.length == 0
      puts "Creating new ranges"
      @srdb.build_shards(number_of_shards)
    end

    # Populate the shards with some nodes at random
    node_names = []
    all_nodes = Node.by_name
    all_nodes.each do |anode|
      node_names << anode.name
    end

    ShardRange.by_range_start.each do |s|
      if !s.node_list or s.node_list.length == 0
        node_names.shuffle!
        s.node_list = node_names[0..(number_of_replicators-1)]
        s.master_node = node_names[0]
        s.save
      end
    end
  end
end

#event_handler(ev) ⇒ Object



29
30
31
# File 'lib/adhd/node_manager.rb', line 29

def event_handler(ev)
  puts ev
end

#handle_node_update(update) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/adhd/node_manager.rb', line 104

def handle_node_update update
  # Added, removed or changed the status of a node
  # If we are the admin, when a node joins we should allocate to it
  # some shards.

  # Only the head management node deals with node changes
  return if @ndb.head_management_node && ! (@ndb.head_management_node.name == @our_node.name)

  # Given the shard_db and the node_db we should work out a new allocation
  node_list = Node.by_name
  shard_list = ShardRange.by_range_start
  if node_list && shard_list
    assign_nodes_to_shards(node_list, shard_list, 2)
  end
end

#initialize_nodeObject

Retrieve our own node record from CouchDB by our name.

If there are other nodes with the name kill their records!



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/adhd/node_manager.rb', line 49

def initialize_node
  puts "Initialize node #{@config.node_name}"
  Node.use_database @couch_db
  puts "DB #{@couch_db}, node #{@config.node_name}"
  node_candidates = Node.by_name(:key => @config.node_name)
  # node_candidates = @couch_db.view("by_name", {:key => @config.node_name})
  node = node_candidates.pop
  node = Node.new if node.nil?
  node_candidates.each do |other_me|
    other_me.destroy # destroy other records
  end
  # Update our very own record
  node.name = @config.node_name
  node.url = "http://#{@config.node_url}:#{@config.couchdb_server_port}"
  node.status = "RUNNING"
  node.save
  node # Save our node as instance variable
end

#remove_content_shard(content_shard, connection) ⇒ Object



153
154
155
156
157
158
159
160
161
162
# File 'lib/adhd/node_manager.rb', line 153

def remove_content_shard content_shard, connection
  # Kill the connection listening for updates on this shard
  connection.kill
  content_shard.sync
  # TODO: test if the sync happened
  # content_shard.this_shard_db.delete!
  # TODO: run a sync with the current master to ensure that
  #       any changes have been pushed. The DELETE the database
  #       to save space
end

#runObject



164
165
166
167
# File 'lib/adhd/node_manager.rb', line 164

def run
  # Enters the event machine loop
  @conn_manager.run_all
end

#set_as_management_node_if_necessaryObject

We check if we are the first node. If we are the first node, we set ourself up as the management node.



71
72
73
74
75
76
77
# File 'lib/adhd/node_manager.rb', line 71

def set_as_management_node_if_necessary
  all_nodes = Node.by_name
  if all_nodes.length == 1
    @our_node.is_management = 300
    @our_node.save
  end
end

#sync_adminObject



204
205
206
207
# File 'lib/adhd/node_manager.rb', line 204

def sync_admin
    @ndb.sync # SYNC
    @srdb.sync # SYNC
end

#sync_databasesObject



195
196
197
198
199
200
201
202
# File 'lib/adhd/node_manager.rb', line 195

def sync_databases
  @ndb.sync # SYNC
  @srdb.sync # SYNC

  @contentdbs.each_key do |cs|
    @contentdbs[cs][0].sync
  end
end

#sync_with_buddy_nodeObject

Sync the db with our buddy



35
36
37
38
39
40
41
42
43
# File 'lib/adhd/node_manager.rb', line 35

def sync_with_buddy_node
  begin
    buddy_server = CouchRest.new("#{@config.buddy_server_url}")
    buddy_db = buddy_server.database!(@config.buddy_server_db_name + "_node_db")
    @couch_db.replicate_from(buddy_db)
  rescue
    puts "Could not buddy up with node #{@config.buddy_server_db_name}"
  end
end