Class: Moped::Node
- Inherits:
-
Object
- Object
- Moped::Node
- Includes:
- Executable, Instrumentable
- Defined in:
- lib/moped/node.rb
Overview
Represents a client to a node in a server cluster.
Constant Summary
Constants included from Instrumentable
Instance Attribute Summary collapse
-
#address ⇒ Address
The address.
-
#credentials ⇒ Hash
The credentials of the node.
-
#down_at ⇒ Time
The time the node was marked as down.
-
#latency ⇒ Integer
The latency in milliseconds.
-
#options ⇒ Hash
The node options.
- #refreshed_at ⇒ Object
Attributes included from Instrumentable
Instance Method Summary collapse
-
#==(other) ⇒ true, false
(also: #eql?)
Is this node equal to another?.
-
#arbiter? ⇒ true, false
Is the node an arbiter?.
-
#auto_discovering? ⇒ true, false
Is the node auto-discovering new peers in the cluster?.
-
#command(database, cmd, options = {}) ⇒ Hash
Execute a command against a database.
-
#connected? ⇒ true, false
Is the node currently connected?.
-
#connection ⇒ Connection
Get the underlying connection for the node.
-
#disconnect ⇒ true
Force the node to disconnect from the server.
-
#down! ⇒ nil
Mark the node as down.
-
#down? ⇒ Time?
Is the node down?.
-
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
-
#ensure_primary ⇒ nil
Set a flag on the node for the duration of provided block so that an exception is raised if the node is no longer the primary node.
-
#get_more(database, collection, cursor_id, limit) ⇒ Message
Execute a get more operation on the node.
-
#hash ⇒ Integer
Get the hash identifier for the node.
-
#initialize(address, options = {}) ⇒ Node
constructor
Creat the new node.
-
#insert(database, collection, documents, concern, options = {}) ⇒ Message
Insert documents into the database.
-
#inspect ⇒ String
Get the node as a nice formatted string.
-
#kill_cursors(cursor_ids) ⇒ Message
Kill all provided cursors on the node.
-
#messagable? ⇒ true, false
Can we send messages to this node in normal cirucmstances? This is true only if the node is a primary or secondary node - arbiters or passives cannot be sent anything.
-
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?.
-
#passive? ⇒ true, false
Is the node passive?.
-
#peers ⇒ Array<Node>
Get all the other nodes in the replica set according to the server information.
-
#pipeline ⇒ nil
Execute a pipeline of commands, for example a safe mode persist.
-
#primary? ⇒ true, false
Is the node the replica set primary?.
-
#process(operation, &callback) ⇒ Object
Processes the provided operation on this node, and will execute the callback when the operation is sent to the database.
-
#query(database, collection, selector, options = {}) ⇒ Message
Execute a query on the node.
-
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
-
#remove(database, collection, selector, concern, options = {}) ⇒ Message
Execute a remove command for the provided selector.
-
#secondary? ⇒ true, false
Is the node a replica set secondary?.
-
#timeout ⇒ Integer
Get the timeout, in seconds, for this node.
-
#update(database, collection, selector, change, concern, options = {}) ⇒ Message
Execute an update command for the provided selector.
Methods included from Instrumentable
Methods included from Executable
Constructor Details
#initialize(address, options = {}) ⇒ Node
Creat the new node.
247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/moped/node.rb', line 247 def initialize(address, = {}) @options = @down_at = nil @refreshed_at = nil @latency = nil @primary = nil @secondary = nil @credentials = {} @instrumenter = [:instrumenter] || Instrumentable::Log @address = Address.new(address, timeout) @address.resolve(self) end |
Instance Attribute Details
#address ⇒ Address
Returns The address.
29 30 31 |
# File 'lib/moped/node.rb', line 29 def address @address end |
#credentials ⇒ Hash
Returns The credentials of the node.
33 34 35 |
# File 'lib/moped/node.rb', line 33 def credentials @credentials end |
#down_at ⇒ Time
Returns The time the node was marked as down.
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
#latency ⇒ Integer
Returns The latency in milliseconds.
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
#options ⇒ Hash
Returns The node options.
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
#refreshed_at ⇒ Object
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
Instance Method Details
#==(other) ⇒ true, false Also known as: eql?
Is this node equal to another?
45 46 47 48 |
# File 'lib/moped/node.rb', line 45 def ==(other) return false unless other.is_a?(Node) address.resolved == other.address.resolved end |
#arbiter? ⇒ true, false
Is the node an arbiter?
59 60 61 |
# File 'lib/moped/node.rb', line 59 def arbiter? !!@arbiter end |
#auto_discovering? ⇒ true, false
Is the node auto-discovering new peers in the cluster?
71 72 73 |
# File 'lib/moped/node.rb', line 71 def auto_discovering? @auto_discovering ||= [:auto_discover].nil? ? true : [:auto_discover] end |
#command(database, cmd, options = {}) ⇒ Hash
Execute a command against a database.
89 90 91 |
# File 'lib/moped/node.rb', line 89 def command(database, cmd, = {}) read(Protocol::Command.new(database, cmd, )) end |
#connected? ⇒ true, false
Is the node currently connected?
101 102 103 |
# File 'lib/moped/node.rb', line 101 def connected? connection{ |conn| conn.connected? } end |
#connection ⇒ Connection
Get the underlying connection for the node.
113 114 115 116 117 |
# File 'lib/moped/node.rb', line 113 def connection pool.with do |conn| yield(conn) end end |
#disconnect ⇒ true
Force the node to disconnect from the server.
139 140 141 142 |
# File 'lib/moped/node.rb', line 139 def disconnect connection{ |conn| conn.disconnect } if address.resolved true end |
#down! ⇒ nil
Mark the node as down.
152 153 154 155 156 |
# File 'lib/moped/node.rb', line 152 def down! @down_at = Time.new @latency = nil disconnect end |
#down? ⇒ Time?
Is the node down?
127 128 129 |
# File 'lib/moped/node.rb', line 127 def down? @down_at end |
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/moped/node.rb', line 171 def ensure_connected(&block) unless (conn = stack(:connection)).empty? return yield(conn.first) end begin connection do |conn| connect(conn) unless conn.alive? conn.apply_credentials(@credentials) stack(:connection) << conn yield(conn) end rescue Exception => e Failover.get(e).execute(e, self, &block) ensure end_execution(:connection) end end |
#ensure_primary ⇒ nil
Set a flag on the node for the duration of provided block so that an exception is raised if the node is no longer the primary node.
202 203 204 205 206 |
# File 'lib/moped/node.rb', line 202 def ensure_primary execute(:ensure_primary) do yield(self) end end |
#get_more(database, collection, cursor_id, limit) ⇒ Message
Execute a get more operation on the node.
222 223 224 |
# File 'lib/moped/node.rb', line 222 def get_more(database, collection, cursor_id, limit) read(Protocol::GetMore.new(database, collection, cursor_id, limit)) end |
#hash ⇒ Integer
Get the hash identifier for the node.
234 235 236 |
# File 'lib/moped/node.rb', line 234 def hash address.resolved.hash end |
#insert(database, collection, documents, concern, options = {}) ⇒ Message
Insert documents into the database.
272 273 274 |
# File 'lib/moped/node.rb', line 272 def insert(database, collection, documents, concern, = {}) write(Protocol::Insert.new(database, collection, documents, ), concern) end |
#inspect ⇒ String
Get the node as a nice formatted string.
513 514 515 |
# File 'lib/moped/node.rb', line 513 def inspect "<#{self.class.name} resolved_address=#{address.resolved.inspect}>" end |
#kill_cursors(cursor_ids) ⇒ Message
Kill all provided cursors on the node.
286 287 288 |
# File 'lib/moped/node.rb', line 286 def kill_cursors(cursor_ids) process(Protocol::KillCursors.new(cursor_ids)) end |
#messagable? ⇒ true, false
Can we send messages to this node in normal cirucmstances? This is true only if the node is a primary or secondary node - arbiters or passives cannot be sent anything.
300 301 302 |
# File 'lib/moped/node.rb', line 300 def messagable? primary? || secondary? end |
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?
314 315 316 |
# File 'lib/moped/node.rb', line 314 def needs_refresh?(time) !refreshed_at || refreshed_at < time end |
#passive? ⇒ true, false
Is the node passive?
326 327 328 |
# File 'lib/moped/node.rb', line 326 def passive? !!@passive end |
#peers ⇒ Array<Node>
Get all the other nodes in the replica set according to the server information.
339 340 341 |
# File 'lib/moped/node.rb', line 339 def peers @peers ||= [] end |
#pipeline ⇒ nil
Execute a pipeline of commands, for example a safe mode persist.
@todo: Remove with piggbacked gle.
354 355 356 357 358 359 |
# File 'lib/moped/node.rb', line 354 def pipeline execute(:pipeline) do yield(self) end flush unless executing?(:pipeline) end |
#primary? ⇒ true, false
Is the node the replica set primary?
369 370 371 |
# File 'lib/moped/node.rb', line 369 def primary? !!@primary end |
#process(operation, &callback) ⇒ Object
Processes the provided operation on this node, and will execute the callback when the operation is sent to the database.
387 388 389 390 391 392 393 |
# File 'lib/moped/node.rb', line 387 def process(operation, &callback) if executing?(:pipeline) queue.push([ operation, callback ]) else flush([[ operation, callback ]]) end end |
#query(database, collection, selector, options = {}) ⇒ Message
Execute a query on the node.
410 411 412 |
# File 'lib/moped/node.rb', line 410 def query(database, collection, selector, = {}) read(Protocol::Query.new(database, collection, selector, )) end |
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/moped/node.rb', line 428 def refresh if address.resolve(self) begin @refreshed_at = Time.now configure(command("admin", ismaster: 1)) if !primary? && executing?(:ensure_primary) raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {}) elsif !messagable? # not primary or secondary so mark it as down, since it's probably # a recovering node withing the replica set down! end rescue Timeout::Error down! end end end |
#remove(database, collection, selector, concern, options = {}) ⇒ Message
Execute a remove command for the provided selector.
459 460 461 |
# File 'lib/moped/node.rb', line 459 def remove(database, collection, selector, concern, = {}) write(Protocol::Delete.new(database, collection, selector, ), concern) end |
#secondary? ⇒ true, false
Is the node a replica set secondary?
471 472 473 |
# File 'lib/moped/node.rb', line 471 def secondary? @secondary end |
#timeout ⇒ Integer
Get the timeout, in seconds, for this node.
483 484 485 |
# File 'lib/moped/node.rb', line 483 def timeout @timeout ||= ([:timeout] || 5) end |
#update(database, collection, selector, change, concern, options = {}) ⇒ Message
Execute an update command for the provided selector.
501 502 503 |
# File 'lib/moped/node.rb', line 501 def update(database, collection, selector, change, concern, = {}) write(Protocol::Update.new(database, collection, selector, change, ), concern) end |