Class: Moped::Cluster

Inherits:
Object show all
Defined in:
lib/moped/cluster.rb

Overview

The cluster represents a cluster of MongoDB server nodes, either a single node, a replica set, or a mongos server.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(hosts, options) ⇒ Cluster

Initialize the new cluster.

Examples:

Initialize the cluster.

Cluster.new([ "localhost:27017" ], down_interval: 20)

Parameters:

  • options (Hash)

    The cluster options.

Options Hash (options):

  • :down_interval (Object)

    number of seconds to wait before attempting to reconnect to a down node. (30)

  • :refresh_interval (Object)

    number of seconds to cache information about a node. (300)

  • :timeout (Integer)

    The time in seconds to wait for an operation to timeout. (5)

Since:

  • 1.0.0



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/moped/cluster.rb', line 100

def initialize(hosts, options)
  @seeds = hosts
  @nodes = hosts.map { |host| Node.new(host, options) }
  @peers = []

  @options = {
    down_interval: 30,
    max_retries: 20,
    refresh_interval: 300,
    retry_interval: 0.25
  }.merge(options)
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



9
10
11
# File 'lib/moped/cluster.rb', line 9

def options
  @options
end

#options The cluster options.(Theclusteroptions.) ⇒ Object (readonly)



9
# File 'lib/moped/cluster.rb', line 9

attr_reader :options, :seeds

#seedsObject (readonly)

Returns the value of attribute seeds.



9
10
11
# File 'lib/moped/cluster.rb', line 9

def seeds
  @seeds
end

#seeds The seeds the cluster was initialized with.(Theseedstheclusterwasinitializedwith.) ⇒ Object (readonly)



9
# File 'lib/moped/cluster.rb', line 9

attr_reader :options, :seeds

Instance Method Details

#authHash

Get the credentials for the cluster.

Examples:

Get the authentication details.

cluster.auth

Returns:

  • (Hash)

    the cached authentication credentials for this cluster.

Since:

  • 1.0.0



19
20
21
# File 'lib/moped/cluster.rb', line 19

def auth
  @auth ||= {}
end

#disconnecttrue

Disconnects all nodes in the cluster. This should only be used in cases where you know you’re not going to use the cluster on the thread anymore and need to force the connections to close.

Returns:

  • (true)

    True if the disconnect succeeded.

Since:

  • 1.2.0



30
31
32
# File 'lib/moped/cluster.rb', line 30

def disconnect
  nodes(include_arbiters: true).each { |node| node.disconnect } and true
end

#down_intervalInteger

Get the interval at which a node should be flagged as down before retrying.

Examples:

Get the down interval, in seconds.

cluster.down_interval

Returns:

Since:

  • 1.2.7



43
44
45
# File 'lib/moped/cluster.rb', line 43

def down_interval
  options[:down_interval]
end

#inspectObject



274
275
276
# File 'lib/moped/cluster.rb', line 274

def inspect
  "<#{self.class.name} nodes=#{@nodes.inspect}>"
end

#max_retriesInteger

Get the number of times an operation should be retried before raising an error.

Examples:

Get the maximum retries.

cluster.max_retries

Returns:

Since:

  • 1.2.7



56
57
58
# File 'lib/moped/cluster.rb', line 56

def max_retries
  options[:max_retries]
end

#nodes(opts = {}) ⇒ Array<Node>

Returns the list of available nodes, refreshing 1) any nodes which were down and ready to be checked again and 2) any nodes whose information is out of date. Arbiter nodes are not returned.

Examples:

Get the available nodes.

cluster.nodes

Returns:

  • (Array<Node>)

    the list of available nodes.

Since:

  • 1.0.0



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/moped/cluster.rb', line 123

def nodes(opts = {})
  current_time = Time.new
  down_boundary = current_time - down_interval
  refresh_boundary = current_time - refresh_interval

  # Find the nodes that were down but are ready to be refreshed, or those
  # with stale connection information.
  needs_refresh, available = @nodes.partition do |node|
    node.down? ? (node.down_at < down_boundary) : node.needs_refresh?(refresh_boundary)
  end

  # Refresh those nodes.
  available.concat refresh(needs_refresh)

  # Now return all the nodes that are available and participating in the
  # replica set.
  available.reject do |node|
    node.down? || !member?(node) || (!opts[:include_arbiters] && node.arbiter?)
  end
end

#refresh(nodes_to_refresh = @nodes) ⇒ Array<Node>

Refreshes information for each of the nodes provided. The node list defaults to the list of all known nodes.

If a node is successfully refreshed, any newly discovered peers will also be refreshed.

Examples:

Refresh the nodes.

cluster.refresh

Parameters:

  • nodes_to_refresh (Array<Node>) (defaults to: @nodes)

    The nodes to refresh.

Returns:

Since:

  • 1.0.0



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/moped/cluster.rb', line 158

def refresh(nodes_to_refresh = @nodes)
  refreshed_nodes = []
  seen = {}

  # Set up a recursive lambda function for refreshing a node and it's peers.
  refresh_node = ->(node) do
    unless seen[node]
      seen[node] = true

      # Add the node to the global list of known nodes.
      @nodes << node unless @nodes.include?(node)

      begin
        node.refresh

        # This node is good, so add it to the list of nodes to return.
        refreshed_nodes << node unless refreshed_nodes.include?(node)

        # Now refresh any newly discovered peer nodes - this will also
        # remove nodes that are not included in the peer list.
        refresh_peers(node, &refresh_node)
      rescue Errors::ConnectionFailure
        # We couldn't connect to the node.
      end
    end
  end

  nodes_to_refresh.each(&refresh_node)
  refreshed_nodes.to_a
end

#refresh_intervalInteger

Get the interval in which the node list should be refreshed.

Examples:

Get the refresh interval, in seconds.

cluster.refresh_interval

Returns:

  • (Integer)

    The refresh interval.

Since:

  • 1.2.7



68
69
70
# File 'lib/moped/cluster.rb', line 68

def refresh_interval
  options[:refresh_interval]
end

#retry_intervalInteger

Get the operation retry interval - the time to wait before retrying a single operation.

Examples:

Get the retry interval, in seconds.

cluster.retry_interval

Returns:

  • (Integer)

    The retry interval.

Since:

  • 1.2.7



81
82
83
# File 'lib/moped/cluster.rb', line 81

def retry_interval
  options[:retry_interval]
end

#with_primary(retries = max_retries, &block) ⇒ Object

Yields the replica set’s primary node to the provided block. This method will retry the block in case of connection errors or replica set reconfiguration.

Examples:

Yield the primary to the block.

cluster.with_primary do |node|
  # ...
end

Parameters:

  • retries (Integer) (defaults to: max_retries)

    The number of times to retry.

Returns:

  • (Object)

    The result of the yield.

Since:

  • 1.0.0



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/moped/cluster.rb', line 205

def with_primary(retries = max_retries, &block)
  if node = nodes.find(&:primary?)
    begin
      node.ensure_primary do
        return yield node.apply_auth(auth)
      end
    rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
      # Fall through to the code below if our connection was dropped or the
      # node is no longer the primary.
    end
  end

  if retries > 0
    # We couldn't find a primary node, so refresh the list and try again.
    warning("  MOPED: Retrying connection to primary for replica set #{inspect}")
    sleep(retry_interval)
    refresh
    with_primary(retries - 1, &block)
  else
    raise(
      Errors::ConnectionFailure,
      "Could not connect to a primary node for replica set #{inspect}"
    )
  end
end

#with_secondary(retries = max_retries, &block) ⇒ Object

Yields a secondary node if available, otherwise the primary node. This method will retry the block in case of connection errors.

Examples:

Yield the secondary to the block.

cluster.with_secondary do |node|
  # ...
end

Parameters:

  • retries (Integer) (defaults to: max_retries)

    The number of times to retry.

Returns:

  • (Object)

    The result of the yield.

Since:

  • 1.0.0



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/moped/cluster.rb', line 246

def with_secondary(retries = max_retries, &block)
  available_nodes = nodes.shuffle!.partition(&:secondary?).flatten

  while node = available_nodes.shift
    begin
      return yield node.apply_auth(auth)
    rescue Errors::ConnectionFailure
      warning("  MOPED: Connection failed to secondary node #{node.inspect}, trying next node.")
      # That node's no good, so let's try the next one.
      next
    end
  end

  if retries > 0
    # We couldn't find a secondary or primary node, so refresh the list and
    # try again.
    warning("  MOPED: Could not connect to any node in replica set #{inspect}, refreshing list.")
    sleep(retry_interval)
    refresh
    with_secondary(retries - 1, &block)
  else
    raise(
      Errors::ConnectionFailure,
      "Could not connect to any secondary or primary nodes for replica set #{inspect}"
    )
  end
end