Class: Mongo::MongoShardedClient

Inherits:
MongoReplicaSetClient show all
Defined in:
lib/mongo/mongo_sharded_client.rb

Overview

Instantiates and manages connections to a MongoDB sharded cluster for high availability.

Direct Known Subclasses

ShardedConnection

Constant Summary collapse

SHARDED_CLUSTER_OPTS =
[:refresh_mode, :refresh_interval]

Constants inherited from MongoReplicaSetClient

Mongo::MongoReplicaSetClient::REPL_SET_OPTS

Constants inherited from MongoClient

Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::TCPSocket, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS

Constants included from Networking

Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Instance Attribute Summary collapse

Attributes inherited from MongoReplicaSetClient

#acceptable_latency, #replica_set_name, #tag_sets

Attributes inherited from MongoClient

#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #primary, #primary_pool, #size, #socket_class, #tag_sets, #write_concern

Attributes included from WriteConcern

#legacy_write_concern

Instance Method Summary collapse

Methods inherited from MongoReplicaSetClient

#arbiters, #authenticate_pools, #checkin, #checkout_reader, #checkout_writer, #close, #connecting?, #ensure_manager, #get_socket_from_pool, #host, #hosts, #local_manager, #logout_pools, #max_bson_size, #nodes, #pin_pool, #port, #primary, #primary_pool, #read_pool, #read_preference, #read_primary?, #refresh, #reset_connection, #secondaries, #secondary_pool, #secondary_pools, #tag_map, #unpin_pool

Methods inherited from MongoClient

#[], #active?, #add_auth, #apply_saved_authentication, #authenticate_pools, #checkin, #checkout_reader, #checkout_writer, #clear_auths, #close, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #host, #lock!, #locked?, #logout_pools, #max_bson_size, multi, #pin_pool, #ping, #port, #read_pool, #read_preference, #read_primary?, #refresh, #remove_auth, #server_info, #server_version, #unlock!, #unpin_pool

Methods included from WriteConcern

#get_write_concern, gle?, #write_concern_from_legacy

Methods included from Networking

#receive_message, #send_message, #send_message_with_gle

Methods included from Logging

#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message

Constructor Details

#initialize(*args) ⇒ MongoShardedClient

Create a connection to a MongoDB sharded cluster.

If no args are provided, it will check ENV["MONGODB_URI"].

Note: that the number of seed nodes does not have to be equal to the number of sharded cluster members. The purpose of seed nodes is to permit the driver to find at least one sharded cluster member even if a member is down.

Examples:

Connect to a sharded cluster and provide two seed nodes.

MongoShardedClient.new(['localhost:30000', 'localhost:30001'])

Parameters:

  • seeds (Array)

    “host:port” strings

  • opts (Hash)

    a customizable set of options

Raises:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/mongo/mongo_sharded_client.rb', line 66

def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}

  nodes = args.flatten

  if nodes.empty? and ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    if parser.direct?
      raise MongoArgumentError, "Mongo::MongoShardedClient.new called with no arguments, but ENV['MONGODB_URI'] implies a direct connection."
    end
    opts = parser.connection_options.merge! opts
    nodes = [parser.nodes]
  end

  unless nodes.length > 0
    raise MongoArgumentError, "A MongoShardedClient requires at least one seed node."
  end

  @seeds = nodes.map do |host_port|
    host, port = host_port.split(":")
    [ host, port.to_i ]
  end

  # TODO: add a method for replacing this list of node.
  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil
  @old_managers = []

  # Lock for request ids.
  @id_lock = Mutex.new

  @pool_mutex = Mutex.new
  @connected = false

  @safe_mutex_lock = Mutex.new
  @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

  @connect_mutex = Mutex.new
  @refresh_mutex = Mutex.new

  check_opts(opts)
  setup(opts)
end

Instance Attribute Details

#managerObject (readonly)

Returns the value of attribute manager.



26
27
28
# File 'lib/mongo/mongo_sharded_client.rb', line 26

def manager
  @manager
end

#refresh_intervalObject (readonly)

Returns the value of attribute refresh_interval.



26
27
28
# File 'lib/mongo/mongo_sharded_client.rb', line 26

def refresh_interval
  @refresh_interval
end

#refresh_modeObject (readonly)

Returns the value of attribute refresh_mode.



26
27
28
# File 'lib/mongo/mongo_sharded_client.rb', line 26

def refresh_mode
  @refresh_mode
end

#refresh_versionObject (readonly)

Returns the value of attribute refresh_version.



26
27
28
# File 'lib/mongo/mongo_sharded_client.rb', line 26

def refresh_version
  @refresh_version
end

#seedsObject (readonly)

Returns the value of attribute seeds.



26
27
28
# File 'lib/mongo/mongo_sharded_client.rb', line 26

def seeds
  @seeds
end

Instance Method Details

#checkout(&block) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/mongo/mongo_sharded_client.rb', line 171

def checkout(&block)
  2.times do
    if connected?
      sync_refresh
    else
      connect
    end

    begin
      socket = block.call
    rescue => ex
      checkin(socket) if socket
      raise ex
    end

    if socket
      return socket
    else
      @connected = false
      #raise ConnectionFailure.new("Could not checkout a socket.")
    end
  end
end

#connect(force = !@connected)) ⇒ Object

Initiate a connection to the sharded cluster.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/mongo/mongo_sharded_client.rb', line 126

def connect(force = !@connected)
  return unless force
  log(:info, "Connecting...")
  @connect_mutex.synchronize do
    discovered_seeds = @manager ? @manager.seeds : []
    @old_managers << @manager if @manager
    @manager = ShardingPoolManager.new(self, discovered_seeds | @seeds)

    Thread.current[:managers] ||= Hash.new
    Thread.current[:managers][self] = @manager

    @manager.connect
    @refresh_version += 1
    @last_refresh = Time.now
    @connected = true
  end
end

#connected?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/mongo/mongo_sharded_client.rb', line 156

def connected?
  @connected && @manager.primary_pool
end

#hard_refresh!Boolean

Force a hard refresh of this connection’s view of the sharded cluster.

Returns:

  • (Boolean)

    true if hard refresh occurred. false is returned when unable to get the refresh lock.



150
151
152
153
154
# File 'lib/mongo/mongo_sharded_client.rb', line 150

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  connect(true)
  return true
end

#inspectObject



120
121
122
123
# File 'lib/mongo/mongo_sharded_client.rb', line 120

def inspect
  "<Mongo::MongoShardedClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
      "@connected=#{@connected}>"
end

#slave_ok?Boolean

Returns true if it’s okay to read from a secondary node. Since this is a sharded cluster, this must always be false.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

Returns:

  • (Boolean)

    true



167
168
169
# File 'lib/mongo/mongo_sharded_client.rb', line 167

def slave_ok?
  false
end

#valid_optsObject



116
117
118
# File 'lib/mongo/mongo_sharded_client.rb', line 116

def valid_opts
  GENERIC_OPTS + SHARDED_CLUSTER_OPTS
end