Class: Moped::Node
- Inherits:
-
Object
- Object
- Moped::Node
- Includes:
- Executable, Instrumentable
- Defined in:
- lib/moped/node.rb
Overview
Represents a client to a node in a server cluster.
Constant Summary
Constants included from Instrumentable
Instance Attribute Summary collapse
-
#address ⇒ Address
The address.
-
#credentials ⇒ Hash
The credentials of the node.
-
#down_at ⇒ Time
The time the node was marked as down.
-
#latency ⇒ Integer
The latency in milliseconds.
-
#options ⇒ Hash
The node options.
- #refreshed_at ⇒ Object
Attributes included from Instrumentable
Instance Method Summary collapse
-
#==(other) ⇒ true, false
(also: #eql?)
Is this node equal to another?.
-
#arbiter? ⇒ true, false
Is the node an arbiter?.
-
#auto_discovering? ⇒ true, false
Is the node auto-discovering new peers in the cluster?.
-
#command(database, cmd, options = {}) ⇒ Hash
Execute a command against a database.
-
#connected? ⇒ true, false
Is the node currently connected?.
-
#connection ⇒ Connection
Get the underlying connection for the node.
-
#disconnect ⇒ true
Force the node to disconnect from the server.
-
#down! ⇒ nil
Mark the node as down.
-
#down? ⇒ Time?
Is the node down?.
-
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
-
#ensure_primary ⇒ nil
Set a flag on the node for the duration of provided block so that an exception is raised if the node is no longer the primary node.
-
#get_more(database, collection, cursor_id, limit) ⇒ Message
Execute a get more operation on the node.
-
#hash ⇒ Integer
Get the hash identifier for the node.
-
#initialize(address, options = {}) ⇒ Node
constructor
Creat the new node.
-
#insert(database, collection, documents, concern, options = {}) ⇒ Message
Insert documents into the database.
-
#inspect ⇒ String
Get the node as a nice formatted string.
-
#kill_cursors(cursor_ids) ⇒ Message
Kill all provided cursors on the node.
-
#messagable? ⇒ true, false
Can we send messages to this node in normal cirucmstances? This is true only if the node is a primary or secondary node - arbiters or passives cannot be sent anything.
-
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?.
-
#passive? ⇒ true, false
Is the node passive?.
-
#peers ⇒ Array<Node>
Get all the other nodes in the replica set according to the server information.
-
#pipeline ⇒ nil
Execute a pipeline of commands, for example a safe mode persist.
-
#primary? ⇒ true, false
Is the node the replica set primary?.
-
#process(operation, &callback) ⇒ Object
Processes the provided operation on this node, and will execute the callback when the operation is sent to the database.
-
#query(database, collection, selector, options = {}) ⇒ Message
Execute a query on the node.
-
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
-
#remove(database, collection, selector, concern, options = {}) ⇒ Message
Execute a remove command for the provided selector.
-
#secondary? ⇒ true, false
Is the node a replica set secondary?.
-
#timeout ⇒ Integer
Get the timeout, in seconds, for this node.
-
#update(database, collection, selector, change, concern, options = {}) ⇒ Message
Execute an update command for the provided selector.
Methods included from Instrumentable
Methods included from Executable
Constructor Details
#initialize(address, options = {}) ⇒ Node
Creat the new node.
262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/moped/node.rb', line 262 def initialize(address, = {}) @options = @down_at = nil @refreshed_at = nil @latency = nil @primary = nil @secondary = nil @credentials = {} @instrumenter = [:instrumenter] || Instrumentable::Log @address = Address.new(address, timeout) @address.resolve(self) end |
Instance Attribute Details
#address ⇒ Address
Returns The address.
29 30 31 |
# File 'lib/moped/node.rb', line 29 def address @address end |
#credentials ⇒ Hash
Returns The credentials of the node.
33 34 35 |
# File 'lib/moped/node.rb', line 33 def credentials @credentials end |
#down_at ⇒ Time
Returns The time the node was marked as down.
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
#latency ⇒ Integer
Returns The latency in milliseconds.
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
#options ⇒ Hash
Returns The node options.
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
#refreshed_at ⇒ Object
29 |
# File 'lib/moped/node.rb', line 29 attr_reader :address, :down_at, :latency, :options, :refreshed_at |
Instance Method Details
#==(other) ⇒ true, false Also known as: eql?
Is this node equal to another?
45 46 47 48 |
# File 'lib/moped/node.rb', line 45 def ==(other) return false unless other.is_a?(Node) address.resolved == other.address.resolved end |
#arbiter? ⇒ true, false
Is the node an arbiter?
59 60 61 |
# File 'lib/moped/node.rb', line 59 def arbiter? !!@arbiter end |
#auto_discovering? ⇒ true, false
Is the node auto-discovering new peers in the cluster?
71 72 73 |
# File 'lib/moped/node.rb', line 71 def auto_discovering? @auto_discovering ||= [:auto_discover].nil? ? true : [:auto_discover] end |
#command(database, cmd, options = {}) ⇒ Hash
Execute a command against a database.
89 90 91 |
# File 'lib/moped/node.rb', line 89 def command(database, cmd, = {}) read(Protocol::Command.new(database, cmd, )) end |
#connected? ⇒ true, false
Is the node currently connected?
101 102 103 |
# File 'lib/moped/node.rb', line 101 def connected? connection{ |conn| conn.connected? } end |
#connection ⇒ Connection
Get the underlying connection for the node.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/moped/node.rb', line 113 def connection connection_acquired = false begin pool.with do |conn| connection_acquired = true yield(conn) end rescue Timeout::Error, ConnectionPool::PoolShuttingDownError => e if e.kind_of?(ConnectionPool::PoolShuttingDownError) @pool = nil Connection::Manager.delete_pool(self) raise Errors::PoolTimeout.new(e) end raise connection_acquired ? e : Errors::PoolTimeout.new(e) end end |
#disconnect ⇒ true
Force the node to disconnect from the server.
150 151 152 153 |
# File 'lib/moped/node.rb', line 150 def disconnect connection{ |conn| conn.disconnect } true end |
#down! ⇒ nil
Mark the node as down.
163 164 165 166 167 168 |
# File 'lib/moped/node.rb', line 163 def down! @down_at = Time.new @pool = nil @latency = nil Connection::Manager.shutdown(self) end |
#down? ⇒ Time?
Is the node down?
138 139 140 |
# File 'lib/moped/node.rb', line 138 def down? @down_at end |
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/moped/node.rb', line 183 def ensure_connected(&block) unless (conn = stack(:connection)).empty? return yield(conn.first) end begin connection do |conn| connect(conn) unless conn.alive? conn.apply_credentials(@credentials) stack(:connection) << conn yield(conn) end rescue Exception => e if e.kind_of?(ConnectionPool::PoolShuttingDownError) @pool = nil Connection::Manager.delete_pool(self) end Failover.get(e).execute(e, self, &block) ensure end_execution(:connection) end end |
#ensure_primary ⇒ nil
Set a flag on the node for the duration of provided block so that an exception is raised if the node is no longer the primary node.
217 218 219 220 221 |
# File 'lib/moped/node.rb', line 217 def ensure_primary execute(:ensure_primary) do yield(self) end end |
#get_more(database, collection, cursor_id, limit) ⇒ Message
Execute a get more operation on the node.
237 238 239 |
# File 'lib/moped/node.rb', line 237 def get_more(database, collection, cursor_id, limit) read(Protocol::GetMore.new(database, collection, cursor_id, limit)) end |
#hash ⇒ Integer
Get the hash identifier for the node.
249 250 251 |
# File 'lib/moped/node.rb', line 249 def hash address.resolved.hash end |
#insert(database, collection, documents, concern, options = {}) ⇒ Message
Insert documents into the database.
287 288 289 |
# File 'lib/moped/node.rb', line 287 def insert(database, collection, documents, concern, = {}) write(Protocol::Insert.new(database, collection, documents, ), concern) end |
#inspect ⇒ String
Get the node as a nice formatted string.
528 529 530 |
# File 'lib/moped/node.rb', line 528 def inspect "<#{self.class.name} resolved_address=#{address.resolved.inspect}>" end |
#kill_cursors(cursor_ids) ⇒ Message
Kill all provided cursors on the node.
301 302 303 |
# File 'lib/moped/node.rb', line 301 def kill_cursors(cursor_ids) process(Protocol::KillCursors.new(cursor_ids)) end |
#messagable? ⇒ true, false
Can we send messages to this node in normal cirucmstances? This is true only if the node is a primary or secondary node - arbiters or passives cannot be sent anything.
315 316 317 |
# File 'lib/moped/node.rb', line 315 def messagable? primary? || secondary? end |
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?
329 330 331 |
# File 'lib/moped/node.rb', line 329 def needs_refresh?(time) !refreshed_at || refreshed_at < time end |
#passive? ⇒ true, false
Is the node passive?
341 342 343 |
# File 'lib/moped/node.rb', line 341 def passive? !!@passive end |
#peers ⇒ Array<Node>
Get all the other nodes in the replica set according to the server information.
354 355 356 |
# File 'lib/moped/node.rb', line 354 def peers @peers ||= [] end |
#pipeline ⇒ nil
Execute a pipeline of commands, for example a safe mode persist.
@todo: Remove with piggbacked gle.
369 370 371 372 373 374 |
# File 'lib/moped/node.rb', line 369 def pipeline execute(:pipeline) do yield(self) end flush unless executing?(:pipeline) end |
#primary? ⇒ true, false
Is the node the replica set primary?
384 385 386 |
# File 'lib/moped/node.rb', line 384 def primary? !!@primary end |
#process(operation, &callback) ⇒ Object
Processes the provided operation on this node, and will execute the callback when the operation is sent to the database.
402 403 404 405 406 407 408 |
# File 'lib/moped/node.rb', line 402 def process(operation, &callback) if executing?(:pipeline) queue.push([ operation, callback ]) else flush([[ operation, callback ]]) end end |
#query(database, collection, selector, options = {}) ⇒ Message
Execute a query on the node.
425 426 427 |
# File 'lib/moped/node.rb', line 425 def query(database, collection, selector, = {}) read(Protocol::Query.new(database, collection, selector, )) end |
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 |
# File 'lib/moped/node.rb', line 443 def refresh if address.resolve(self) begin @refreshed_at = Time.now configure(command("admin", ismaster: 1)) if !primary? && executing?(:ensure_primary) raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {}) elsif !messagable? # not primary or secondary so mark it as down, since it's probably # a recovering node withing the replica set down! end rescue Timeout::Error down! end end end |
#remove(database, collection, selector, concern, options = {}) ⇒ Message
Execute a remove command for the provided selector.
474 475 476 |
# File 'lib/moped/node.rb', line 474 def remove(database, collection, selector, concern, = {}) write(Protocol::Delete.new(database, collection, selector, ), concern) end |
#secondary? ⇒ true, false
Is the node a replica set secondary?
486 487 488 |
# File 'lib/moped/node.rb', line 486 def secondary? @secondary end |
#timeout ⇒ Integer
Get the timeout, in seconds, for this node.
498 499 500 |
# File 'lib/moped/node.rb', line 498 def timeout @timeout ||= ([:timeout] || 5) end |
#update(database, collection, selector, change, concern, options = {}) ⇒ Message
Execute an update command for the provided selector.
516 517 518 |
# File 'lib/moped/node.rb', line 516 def update(database, collection, selector, change, concern, = {}) write(Protocol::Update.new(database, collection, selector, change, ), concern) end |