Class: Mongo::MongoReplicaSetClient

Inherits:
MongoClient show all
Defined in:
lib/mongo/mongo_replica_set_client.rb

Overview

Instantiates and manages connections to a MongoDB replica set.

Direct Known Subclasses

MongoShardedClient, ReplSetConnection

Constant Summary collapse

REPL_SET_OPTS =
[
  :read,
  :refresh_mode,
  :refresh_interval,
  :read_secondary,
  :rs_name,
  :name,
  :tag_sets,
  :secondary_acceptable_latency_ms
]

Constants inherited from MongoClient

Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::TCPSocket, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS

Constants included from Networking

Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Instance Attribute Summary collapse

Attributes inherited from MongoClient

#auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #size, #socket_class, #write_concern

Attributes included from WriteConcern

#legacy_write_concern

Instance Method Summary collapse

Methods inherited from MongoClient

#[], #active?, #add_auth, #apply_saved_authentication, #clear_auths, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #lock!, #locked?, multi, #ping, #remove_auth, #server_info, #server_version, #unlock!

Methods included from WriteConcern

#get_write_concern, gle?, #write_concern_from_legacy

Methods included from Networking

#receive_message, #send_message, #send_message_with_gle

Methods included from Logging

#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message

Constructor Details

#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient

Create a connection to a MongoDB replica set.

If no args are provided, it will check ENV["MONGODB_URI"].

Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: MongoClient#primary, MongoClient#secondaries, and MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.

Examples:

Connect to a replica set and provide two seed nodes.

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'])

Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named ‘prod’:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :name => 'prod')

Connect to a replica set providing two seed nodes and allowing reads from a secondary node:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :read => :secondary)
Note:

the number of seed nodes does not have to be equal to the number of replica set members. The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.

Parameters:

  • seeds (Array<String>, Array<Array(String, Integer)>) (defaults to: ENV["MONGODB_URI"])

Options Hash (opts):

  • :w (Hash) — default: 1

    , :j (false), :wtimeout (false), :fsync (false) Set the default write concern propagated to DB objects instantiated off of this MongoReplicaSetClient. This default can be overridden upon instantiation of any DB by explicitly setting write concern values on initialization.

  • :read_preference (:primary, :primary_preferred, :secondary, :secondary_preferred, :nearest) — default: :primary

    A “read preference” determines the candidate replica set members to which a query or command can be sent.

    :primary
    • Read from primary only.

    • Cannot be combined with tags.

    :primary_preferred
    • Read from primary if available, otherwise read from a secondary.

    :secondary
    • Read from secondary if available.

    :secondary_preferred
    • Read from a secondary if available, otherwise read from the primary.

    :nearest
    • Read from any member.

  • :tag_sets (Array<Hash{ String, Symbol => Tag Value }>) — default: []

    Read from replica-set members with these tags.

  • :secondary_acceptable_latency_ms (Integer) — default: 15

    The acceptable nearest available member for a member to be considered “near”.

  • :logger (Logger) — default: nil

    Logger instance to receive driver operation log.

  • :pool_size (Integer) — default: 1

    The maximum number of socket connections allowed per connection pool. Note: this setting is relevant only for multi-threaded applications.

  • :pool_timeout (Float) — default: 5.0

    When all of the connections a pool are checked out, this is the number of seconds to wait for a new connection to be released before throwing an exception. Note: this setting is relevant only for multi-threaded applications.

  • :op_timeout (Float) — default: nil

    The number of seconds to wait for a read operation to time out.

  • :connect_timeout (Float) — default: 30

    The number of seconds to wait before timing out a connection attempt.

  • :ssl (Boolean) — default: false

    If true, create the connection to the server using SSL.

  • :refresh_mode (Boolean) — default: false

    Set this to :sync to periodically update the state of the connection every :refresh_interval seconds. Replica set connection failures will always trigger a complete refresh. This option is useful when you want to add new nodes or remove replica set nodes not currently in use by the driver.

  • :refresh_interval (Integer) — default: 90

    If :refresh_mode is enabled, this is the number of seconds between calls to check the replica set’s state.

Raises:

See Also:



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/mongo/mongo_replica_set_client.rb', line 110

def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}
  nodes = args
  nodes = nodes.flatten(1) if nodes.first.is_a?(Array) && nodes.first.first.is_a?(Array)

  if nodes.empty? and ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    if parser.direct?
      raise MongoArgumentError, "Mongo::MongoReplicaSetClient.new called with no arguments, but ENV['MONGODB_URI'] implies a direct connection."
    end
    opts = parser.connection_options.merge! opts
    nodes = [parser.nodes]
  end

  unless nodes.length > 0
    raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node."
  end

  # This is temporary until support for the old format is dropped
  if nodes.first.last.is_a?(Integer)
    warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated."
    warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
    @seeds = nodes
  else
    @seeds = nodes.first.map do |host_port|
      host, port = host_port.split(":")
      [ host, port.to_i ]
    end
  end

  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil
  @old_managers = []

  # Lock for request ids.
  @id_lock = Mutex.new

  @pool_mutex = Mutex.new
  @connected = false

  @safe_mutex_lock = Mutex.new
  @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

  @connect_mutex = Mutex.new
  @refresh_mutex = Mutex.new

  check_opts(opts)
  setup(opts)
end

Instance Attribute Details

#acceptable_latencyObject (readonly)

Returns the value of attribute acceptable_latency.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def acceptable_latency
  @acceptable_latency
end

#managerObject (readonly)

Returns the value of attribute manager.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def manager
  @manager
end

#refresh_intervalObject (readonly)

Returns the value of attribute refresh_interval.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def refresh_interval
  @refresh_interval
end

#refresh_modeObject (readonly)

Returns the value of attribute refresh_mode.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def refresh_mode
  @refresh_mode
end

#refresh_versionObject (readonly)

Returns the value of attribute refresh_version.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def refresh_version
  @refresh_version
end

#replica_set_nameObject (readonly)

Returns the value of attribute replica_set_name.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def replica_set_name
  @replica_set_name
end

#seedsObject (readonly)

Returns the value of attribute seeds.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def seeds
  @seeds
end

#tag_setsObject (readonly)

Returns the value of attribute tag_sets.



35
36
37
# File 'lib/mongo/mongo_replica_set_client.rb', line 35

def tag_sets
  @tag_sets
end

Instance Method Details

#arbitersObject



404
405
406
# File 'lib/mongo/mongo_replica_set_client.rb', line 404

def arbiters
  local_manager.arbiters.nil? ? [] : local_manager.arbiters
end

#authenticate_poolsObject



323
324
325
# File 'lib/mongo/mongo_replica_set_client.rb', line 323

def authenticate_pools
  @manager.pools.each { |pool| pool.authenticate_existing }
end

#checkin(socket) ⇒ Object

Checkin a socket used for reading.



369
370
371
372
373
374
# File 'lib/mongo/mongo_replica_set_client.rb', line 369

def checkin(socket)
  if socket && socket.pool
    socket.pool.checkin(socket)
  end
  sync_refresh
end

#checkoutObject

Generic socket checkout Takes a block that returns a socket from pool



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/mongo/mongo_replica_set_client.rb', line 333

def checkout
  ensure_manager

  connected? ? sync_refresh : connect

  begin
    socket = yield
  rescue => ex
    checkin(socket) if socket
    raise ex
  end

  if socket
    socket
  else
    @connected = false
    raise ConnectionFailure.new("Could not checkout a socket.")
  end
  socket
end

#checkout_reader(mode = @read, tag_sets = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object



354
355
356
357
358
359
# File 'lib/mongo/mongo_replica_set_client.rb', line 354

def checkout_reader(mode=@read, tag_sets=@tag_sets, acceptable_latency=@acceptable_latency)
  checkout do
    pool = read_pool(mode, tag_sets, acceptable_latency)
    get_socket_from_pool(pool)
  end
end

#checkout_writerObject

Checkout a socket for writing (i.e., a primary node).



362
363
364
365
366
# File 'lib/mongo/mongo_replica_set_client.rb', line 362

def checkout_writer
  checkout do
    get_socket_from_pool(primary_pool)
  end
end

#close(opts = {}) ⇒ Object

Close the connection to the database.



289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/mongo/mongo_replica_set_client.rb', line 289

def close(opts={})
  if opts[:soft]
    @manager.close(:soft => true) if @manager
  else
    @manager.close if @manager
  end

  # Clear the reference to this object.
  if Thread.current[:managers]
    Thread.current[:managers].delete(self)
  end

  @connected = false
end

#connectObject

Initiate a connection to the replica set.



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/mongo/mongo_replica_set_client.rb', line 176

def connect
  log(:info, "Connecting...")
  @connect_mutex.synchronize do
    return if @connected

    seeds = @manager.nil? ? @seeds : @manager.seeds
    @manager = PoolManager.new(self, seeds)

    Thread.current[:managers] ||= Hash.new
    Thread.current[:managers][self] = @manager

    @manager.connect
    @refresh_version += 1

    if @manager.pools.empty?
      close
      raise ConnectionFailure, "Failed to connect to any node."
    else
      @connected = true
    end
  end
end

#connected?Boolean

Returns:

  • (Boolean)


245
246
247
# File 'lib/mongo/mongo_replica_set_client.rb', line 245

def connected?
  @connected && !@manager.pools.empty?
end

#connecting?Boolean

Deprecated.

Returns:

  • (Boolean)


250
251
252
253
# File 'lib/mongo/mongo_replica_set_client.rb', line 250

def connecting?
  warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0."
  false
end

#ensure_managerObject



376
377
378
379
380
381
382
# File 'lib/mongo/mongo_replica_set_client.rb', line 376

def ensure_manager
  Thread.current[:managers] ||= Hash.new

  if Thread.current[:managers][self] != @manager
    Thread.current[:managers][self] = @manager
  end
end

#get_socket_from_pool(pool) ⇒ Object



392
393
394
395
396
397
398
# File 'lib/mongo/mongo_replica_set_client.rb', line 392

def get_socket_from_pool(pool)
  begin
    pool.checkout if pool
  rescue ConnectionFailure
    nil
  end
end

#hard_refresh!Boolean

Force a hard refresh of this connection’s view of the replica set.

Returns:

  • (Boolean)

    true if hard refresh occurred. false is returned when unable to get the refresh lock.



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/mongo/mongo_replica_set_client.rb', line 229

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  discovered_seeds = @manager.seeds
  new_manager = PoolManager.new(self, discovered_seeds | @seeds)
  new_manager.connect

  Thread.current[:managers][self] = new_manager

  # TODO: make sure that connect has succeeded
  @old_managers << @manager
  @manager = new_manager

  @refresh_version += 1
  return true
end

#hostString

The replica set primary’s host name.

Returns:



258
259
260
# File 'lib/mongo/mongo_replica_set_client.rb', line 258

def host
  @manager.primary_pool.host
end

#hostsObject



417
418
419
# File 'lib/mongo/mongo_replica_set_client.rb', line 417

def hosts
  local_manager ? local_manager.hosts : []
end

#inspectObject



170
171
172
173
# File 'lib/mongo/mongo_replica_set_client.rb', line 170

def inspect
  "<Mongo::MongoReplicaSetClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
    "@connected=#{@connected}>"
end

#local_managerObject



400
401
402
# File 'lib/mongo/mongo_replica_set_client.rb', line 400

def local_manager
  Thread.current[:managers][self] if Thread.current[:managers]
end

#logout_pools(db) ⇒ Object



327
328
329
# File 'lib/mongo/mongo_replica_set_client.rb', line 327

def logout_pools(db)
  @manager.pools.each { |pool| pool.logout_existing(db) }
end

#max_bson_sizeObject



441
442
443
444
445
446
447
# File 'lib/mongo/mongo_replica_set_client.rb', line 441

def max_bson_size
  if local_manager && local_manager.max_bson_size
    local_manager.max_bson_size
  else
    Mongo::DEFAULT_MAX_BSON_SIZE
  end
end

#nodesObject



269
270
271
272
273
# File 'lib/mongo/mongo_replica_set_client.rb', line 269

def nodes
  warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " +
    "Please use MongoReplicaSetClient#seeds instead."
  @seeds
end

#pin_pool(pool) ⇒ Object



384
385
386
# File 'lib/mongo/mongo_replica_set_client.rb', line 384

def pin_pool(pool)
  @manager.pinned_pools[Thread.current] = pool if @manager
end

#portInteger

The replica set primary’s port.

Returns:

  • (Integer)


265
266
267
# File 'lib/mongo/mongo_replica_set_client.rb', line 265

def port
  @manager.primary_pool.port
end

#primaryObject



408
409
410
# File 'lib/mongo/mongo_replica_set_client.rb', line 408

def primary
  local_manager ? local_manager.primary : nil
end

#primary_poolObject



421
422
423
# File 'lib/mongo/mongo_replica_set_client.rb', line 421

def primary_pool
  local_manager ? local_manager.primary_pool : nil
end

#read_pool(mode = @read, tags = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object



425
426
427
# File 'lib/mongo/mongo_replica_set_client.rb', line 425

def read_pool(mode=@read, tags=@tag_sets, acceptable_latency=@acceptable_latency)
  local_manager ? local_manager.read_pool(mode, tags, acceptable_latency) : nil
end

#read_preferenceObject



284
285
286
# File 'lib/mongo/mongo_replica_set_client.rb', line 284

def read_preference
  @read
end

#read_primary?Boolean Also known as: primary?

Determine whether we’re reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.

Returns:

  • (Boolean)


279
280
281
# File 'lib/mongo/mongo_replica_set_client.rb', line 279

def read_primary?
  @manager.read_pool == @manager.primary_pool
end

#refresh(opts = {}) ⇒ Boolean

Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running MongoReplicaSetClient#hard_refresh!

Returns:

  • (Boolean)

    true unless a hard refresh is run and the refresh lock can’t be acquired.



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/mongo/mongo_replica_set_client.rb', line 206

def refresh(opts={})
  if !connected?
    log(:info, "Trying to check replica set health but not " +
      "connected...")
    return hard_refresh!
  end

  log(:debug, "Checking replica set connection health...")
  @manager.check_connection_health

  if @manager.refresh_required?
    return hard_refresh!
  end

  return true
end

#reset_connectionObject

Deprecated.

If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values.



307
308
309
310
311
# File 'lib/mongo/mongo_replica_set_client.rb', line 307

def reset_connection
  close
  warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " +
    "Use MongoReplicaSetClient#close instead."
end

#secondariesObject

Note: might want to freeze these after connecting.



413
414
415
# File 'lib/mongo/mongo_replica_set_client.rb', line 413

def secondaries
  local_manager ? local_manager.secondaries : []
end

#secondary_poolObject



429
430
431
# File 'lib/mongo/mongo_replica_set_client.rb', line 429

def secondary_pool
  local_manager ? local_manager.secondary_pool : nil
end

#secondary_poolsObject



433
434
435
# File 'lib/mongo/mongo_replica_set_client.rb', line 433

def secondary_pools
  local_manager ? local_manager.secondary_pools : []
end

#slave_ok?Boolean

Returns true if it’s okay to read from a secondary node.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

Returns:

  • (Boolean)

    true



319
320
321
# File 'lib/mongo/mongo_replica_set_client.rb', line 319

def slave_ok?
  @read != :primary
end

#tag_mapObject



437
438
439
# File 'lib/mongo/mongo_replica_set_client.rb', line 437

def tag_map
  local_manager ? local_manager.tag_map : {}
end

#unpin_pool(pool) ⇒ Object



388
389
390
# File 'lib/mongo/mongo_replica_set_client.rb', line 388

def unpin_pool(pool)
  @manager.pinned_pools[Thread.current] = nil if @manager
end

#valid_optsObject



166
167
168
# File 'lib/mongo/mongo_replica_set_client.rb', line 166

def valid_opts
  super + REPL_SET_OPTS - CLIENT_ONLY_OPTS
end