Class: Moped::Cluster
Overview
The cluster represents a cluster of MongoDB server nodes, either a single node, a replica set, or a mongos server.
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
- #options The cluster options.(Theclusteroptions.) ⇒ Object readonly
-
#seeds ⇒ Object
readonly
Returns the value of attribute seeds.
- #seeds The seeds the cluster was initialized with.(Theseedstheclusterwasinitializedwith.) ⇒ Object readonly
Instance Method Summary collapse
-
#auth ⇒ Hash
Get the authentication details for the cluster.
-
#disconnect ⇒ true
Disconnects all nodes in the cluster.
-
#down_interval ⇒ Integer
Get the interval at which a node should be flagged as down before retrying.
-
#initialize(hosts, options) ⇒ Cluster
constructor
Initialize the new cluster.
- #inspect ⇒ Object
-
#max_retries ⇒ Integer
Get the number of times an operation should be retried before raising an error.
-
#nodes(opts = {}) ⇒ Array<Node>
Returns the list of available nodes, refreshing 1) any nodes which were down and ready to be checked again and 2) any nodes whose information is out of date.
-
#refresh(nodes_to_refresh = @nodes) ⇒ Array<Node>
Refreshes information for each of the nodes provided.
-
#refresh_interval ⇒ Integer
Get the interval in which the node list should be refreshed.
-
#retry_interval ⇒ Integer
Get the operation retry interval - the time to wait before retrying a single operation.
-
#with_primary(retries = max_retries, &block) ⇒ Object
Yields the replica set’s primary node to the provided block.
-
#with_secondary(retries = max_retries, &block) ⇒ Object
Yields a secondary node if available, otherwise the primary node.
Constructor Details
#initialize(hosts, options) ⇒ Cluster
Initialize the new cluster.
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/moped/cluster.rb', line 98 def initialize(hosts, ) @options = { down_interval: 30, max_retries: 30, refresh_interval: 300, retry_interval: 1 }.merge() @seeds = hosts @nodes = hosts.map { |host| Node.new(host) } end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
9 10 11 |
# File 'lib/moped/cluster.rb', line 9 def @options end |
#options The cluster options.(Theclusteroptions.) ⇒ Object (readonly)
9 |
# File 'lib/moped/cluster.rb', line 9 attr_reader :options, :seeds |
#seeds ⇒ Object (readonly)
Returns the value of attribute seeds.
9 10 11 |
# File 'lib/moped/cluster.rb', line 9 def seeds @seeds end |
#seeds The seeds the cluster was initialized with.(Theseedstheclusterwasinitializedwith.) ⇒ Object (readonly)
9 |
# File 'lib/moped/cluster.rb', line 9 attr_reader :options, :seeds |
Instance Method Details
#auth ⇒ Hash
Get the authentication details for the cluster.
19 20 21 |
# File 'lib/moped/cluster.rb', line 19 def auth @auth ||= {} end |
#disconnect ⇒ true
Disconnects all nodes in the cluster. This should only be used in cases where you know you’re not going to use the cluster on the thread anymore and need to force the connections to close.
30 31 32 |
# File 'lib/moped/cluster.rb', line 30 def disconnect nodes(include_arbiters: true).each { |node| node.disconnect } and true end |
#down_interval ⇒ Integer
Get the interval at which a node should be flagged as down before retrying.
43 44 45 |
# File 'lib/moped/cluster.rb', line 43 def down_interval [:down_interval] end |
#inspect ⇒ Object
265 266 267 |
# File 'lib/moped/cluster.rb', line 265 def inspect "<#{self.class.name} nodes=#{@nodes.inspect}>" end |
#max_retries ⇒ Integer
Get the number of times an operation should be retried before raising an error.
56 57 58 |
# File 'lib/moped/cluster.rb', line 56 def max_retries [:max_retries] end |
#nodes(opts = {}) ⇒ Array<Node>
Returns the list of available nodes, refreshing 1) any nodes which were down and ready to be checked again and 2) any nodes whose information is out of date. Arbiter nodes are not returned.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/moped/cluster.rb', line 120 def nodes(opts = {}) current_time = Time.new down_boundary = current_time - down_interval refresh_boundary = current_time - refresh_interval # Find the nodes that were down but are ready to be refreshed, or those # with stale connection information. needs_refresh, available = @nodes.partition do |node| (node.down? && node.down_at < down_boundary) || node.needs_refresh?(refresh_boundary) end # Refresh those nodes. available.concat refresh(needs_refresh) # Now return all the nodes that are available and participating in the # replica set. available.reject { |node| node.down? || (!opts[:include_arbiters] && node.arbiter?) } end |
#refresh(nodes_to_refresh = @nodes) ⇒ Array<Node>
Refreshes information for each of the nodes provided. The node list defaults to the list of all known nodes.
If a node is successfully refreshed, any newly discovered peers will also be refreshed.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/moped/cluster.rb', line 153 def refresh(nodes_to_refresh = @nodes) refreshed_nodes = [] seen = {} # Set up a recursive lambda function for refreshing a node and it's peers. refresh_node = ->(node) do unless seen[node] seen[node] = true # Add the node to the global list of known nodes. @nodes << node unless @nodes.include?(node) begin node.refresh # This node is good, so add it to the list of nodes to return. refreshed_nodes << node unless refreshed_nodes.include?(node) # Now refresh any newly discovered peer nodes. (node.peers - @nodes).each(&refresh_node) rescue Errors::ConnectionFailure # We couldn't connect to the node, so don't do anything with it. end end end nodes_to_refresh.each(&refresh_node) refreshed_nodes.to_a end |
#refresh_interval ⇒ Integer
Get the interval in which the node list should be refreshed.
68 69 70 |
# File 'lib/moped/cluster.rb', line 68 def refresh_interval [:refresh_interval] end |
#retry_interval ⇒ Integer
Get the operation retry interval - the time to wait before retrying a single operation.
81 82 83 |
# File 'lib/moped/cluster.rb', line 81 def retry_interval [:retry_interval] end |
#with_primary(retries = max_retries, &block) ⇒ Object
Yields the replica set’s primary node to the provided block. This method will retry the block in case of connection errors or replica set reconfiguration.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/moped/cluster.rb', line 199 def with_primary(retries = max_retries, &block) if node = nodes.find(&:primary?) begin node.ensure_primary do return yield node.apply_auth(auth) end rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured # Fall through to the code below if our connection was dropped or the # node is no longer the primary. end end if retries > 0 # We couldn't find a primary node, so refresh the list and try again. sleep(retry_interval) refresh with_primary(retries - 1, &block) else raise( Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}" ) end end |
#with_secondary(retries = max_retries, &block) ⇒ Object
Yields a secondary node if available, otherwise the primary node. This method will retry the block in case of connection errors.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/moped/cluster.rb', line 239 def with_secondary(retries = max_retries, &block) available_nodes = nodes.shuffle!.partition(&:secondary?).flatten while node = available_nodes.shift begin return yield node.apply_auth(auth) rescue Errors::ConnectionFailure # That node's no good, so let's try the next one. next end end if retries > 0 # We couldn't find a secondary or primary node, so refresh the list and # try again. sleep(retry_interval) refresh with_secondary(retries - 1, &block) else raise( Errors::ConnectionFailure, "Could not connect to any secondary or primary nodes for replica set #{inspect}" ) end end |