Class: Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/connection/connection.rb

Direct Known Subclasses

TCPConnection

Constant Summary collapse

STATUS_OK =
"ok"
PROTOCOL =
"pb0"
DEFAULT_REQUEST_LIMIT_PER_NODE =
500

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db_name, hosts, request_limit_per_node = DEFAULT_REQUEST_LIMIT_PER_NODE) ⇒ Connection

Returns a new instance of Connection.



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/connection/connection.rb', line 20

def initialize(db_name, hosts, request_limit_per_node = DEFAULT_REQUEST_LIMIT_PER_NODE)
  self.db_name = db_name
  self.hosts   = hosts
  self.nodes   = hosts.collect{ |h|
                  n = h.split(":")
                  node = VoldemortNode.new
                  node.host = n[0]
                  node.port = n[1]
                  node
                 }
  self.request_count = 0
  self.request_limit_per_node = request_limit_per_node
end

Instance Attribute Details

#connected_nodeObject

The VoldemortNode we are connected to.



8
9
10
# File 'lib/connection/connection.rb', line 8

def connected_node
  @connected_node
end

#db_nameObject

The DB store name.



7
8
9
# File 'lib/connection/connection.rb', line 7

def db_name
  @db_name
end

#hostsObject

The hosts from where we bootstrapped.



5
6
7
# File 'lib/connection/connection.rb', line 5

def hosts
  @hosts
end

#key_serializer_schemasObject

Returns the value of attribute key_serializer_schemas.



11
12
13
# File 'lib/connection/connection.rb', line 11

def key_serializer_schemas
  @key_serializer_schemas
end

#key_serializer_typeObject

Returns the value of attribute key_serializer_type.



13
14
15
# File 'lib/connection/connection.rb', line 13

def key_serializer_type
  @key_serializer_type
end

#nodesObject

The array of VoldemortNodes available.



6
7
8
# File 'lib/connection/connection.rb', line 6

def nodes
  @nodes
end

#request_countObject

Used to track the number of request a node receives.



9
10
11
# File 'lib/connection/connection.rb', line 9

def request_count
  @request_count
end

#request_limit_per_nodeObject

Limit the number of request per node.



10
11
12
# File 'lib/connection/connection.rb', line 10

def request_limit_per_node
  @request_limit_per_node
end

#value_serializer_schemasObject

Returns the value of attribute value_serializer_schemas.



12
13
14
# File 'lib/connection/connection.rb', line 12

def value_serializer_schemas
  @value_serializer_schemas
end

#value_serializer_typeObject

Returns the value of attribute value_serializer_type.



14
15
16
# File 'lib/connection/connection.rb', line 14

def value_serializer_type
  @value_serializer_type
end

Instance Method Details

#bootstrapObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/connection/connection.rb', line 34

def bootstrap
  cluster_response = self.get_from("metadata", "cluster.xml", false)
  cluster_xml_doc = Nokogiri::XML(cluster_response[1][0][1])
  self.nodes = self.parse_nodes_from(cluster_xml_doc)

  stores_response = self.get_from("metadata", "stores.xml", false)

  stores_xml = stores_response[1][0][1]

  doc = Nokogiri::XML(stores_xml)

  self.key_serializer_type = self.parse_schema_type(doc, 'key-serializer')
  self.value_serializer_type = self.parse_schema_type(doc, 'value-serializer')
  self.key_serializer_schemas = self.parse_schema_from(doc, 'key-serializer')
  self.value_serializer_schemas = self.parse_schema_from(doc, 'value-serializer')

  self.connect_to_random_node

rescue StandardError => e
   raise("There was an error trying to bootstrap from the specified servers: #{e}")
end

#connectObject



103
104
105
# File 'lib/connection/connection.rb', line 103

def connect
  self.connect!
end

#connect_to_random_nodeObject



56
57
58
59
60
61
62
63
64
65
# File 'lib/connection/connection.rb', line 56

def connect_to_random_node
  nodes = self.nodes.sort_by { rand }
  for node in nodes do
    if self.connect_to(node.host, node.port)
      self.connected_node = node
      self.request_count = 0
      return node
    end
  end
end

#delete(key) ⇒ Object



144
145
146
# File 'lib/connection/connection.rb', line 144

def delete(key)
  self.delete_from(self.db_name, key)
end

#disconnectObject



111
112
113
# File 'lib/connection/connection.rb', line 111

def disconnect
  self.disconnect!
end

#get(key) ⇒ Object



129
130
131
132
# File 'lib/connection/connection.rb', line 129

def get(key)
  self.rebalance_connection_if_needed
  self.get_from(self.db_name, key, false)
end

#get_all(keys) ⇒ Object



134
135
136
137
# File 'lib/connection/connection.rb', line 134

def get_all(keys)
  self.rebalance_connection_if_needed
  self.get_all_from(self.db_name, keys, false)
end

#parse_nodes_from(doc) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/connection/connection.rb', line 84

def parse_nodes_from(doc)
  nodes = []
  doc.xpath("/cluster/server").each do |n|
    node = VoldemortNode.new      
    node.id = n.xpath("id").text
    node.host = n.xpath("host").text
    node.port = n.xpath("socket-port").text
    node.http_port = n.xpath("http_port").text
    node.admin_port = n.xpath("admin-port").text
    node.partitions = n.xpath("partitions").text
    nodes << node
  end
  nodes
end

#parse_schema_from(doc, serializer = 'value-serializer') ⇒ Object



76
77
78
79
80
81
82
# File 'lib/connection/connection.rb', line 76

def parse_schema_from(doc, serializer = 'value-serializer')
  parsed_schemas = {}
  doc.xpath("//stores/store[name = \"#{self.db_name}\"]/#{serializer}/schema-info").each do |value_serializer|
    parsed_schemas[value_serializer.attributes['version'].text] = value_serializer.text
  end
  return parsed_schemas
end

#parse_schema_type(doc, serializer = 'value-serializer') ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/connection/connection.rb', line 67

def parse_schema_type(doc, serializer = 'value-serializer')
  type_doc = doc.xpath("//stores/store[name = \"#{self.db_name}\"]/#{serializer}/type")
  if(type_doc != nil)
    return type_doc.text
  else
    return nil
  end
end

#protocol_versionObject



99
100
101
# File 'lib/connection/connection.rb', line 99

def protocol_version
  PROTOCOL
end

#put(key, value, version = nil, route = false) ⇒ Object



139
140
141
142
# File 'lib/connection/connection.rb', line 139

def put(key, value, version = nil, route = false)
  self.rebalance_connection_if_needed
  self.put_from(self.db_name, key, value, version, route)
end

#rebalance_connection?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/connection/connection.rb', line 120

def rebalance_connection?
  self.request_count >= self.request_limit_per_node
end

#rebalance_connection_if_neededObject



124
125
126
127
# File 'lib/connection/connection.rb', line 124

def rebalance_connection_if_needed
  self.reconnect if self.rebalance_connection?
  self.request_count += 1
end

#reconnectObject



107
108
109
# File 'lib/connection/connection.rb', line 107

def reconnect
  self.reconnect!
end

#reconnect_when_errors_in(response = nil) ⇒ Object



115
116
117
118
# File 'lib/connection/connection.rb', line 115

def reconnect_when_errors_in(response = nil)
  return unless response
  self.reconnect! if response.error
end