Class: Moped::Node

Inherits:
Object
  • Object
show all
Includes:
Executable, Instrumentable
Defined in:
lib/moped/node.rb

Overview

Represents a client to a node in a server cluster.

Since:

  • 1.0.0

Constant Summary

Constants included from Instrumentable

Instrumentable::TOPIC

Instance Attribute Summary collapse

Attributes included from Instrumentable

#instrumenter

Instance Method Summary collapse

Methods included from Instrumentable

#instrument

Methods included from Executable

#execute, #executing?

Constructor Details

#initialize(address, options = {}) ⇒ Node

Creat the new node.

Examples:

Create the new node.

Node.new("127.0.0.1:27017")

Parameters:

  • address (String)

    The location of the server node.

  • options (Hash) (defaults to: {})

    Additional options for the node (ssl)

Since:

  • 1.0.0



262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/moped/node.rb', line 262

def initialize(address, options = {})
  @options = options
  @down_at = nil
  @refreshed_at = nil
  @latency = nil
  @primary = nil
  @secondary = nil
  @credentials = {}
  @instrumenter = options[:instrumenter] || Instrumentable::Log
  @address = Address.new(address, timeout)
  @address.resolve(self)
end

Instance Attribute Details

#addressAddress

Returns The address.

Returns:



29
30
31
# File 'lib/moped/node.rb', line 29

def address
  @address
end

#credentialsHash

Returns The credentials of the node.

Returns:

  • (Hash)

    The credentials of the node.



33
34
35
# File 'lib/moped/node.rb', line 33

def credentials
  @credentials
end

#down_atTime

Returns The time the node was marked as down.

Returns:

  • (Time)

    The time the node was marked as down.



29
# File 'lib/moped/node.rb', line 29

attr_reader :address, :down_at, :latency, :options, :refreshed_at

#latencyInteger

Returns The latency in milliseconds.

Returns:

  • (Integer)

    The latency in milliseconds.



29
# File 'lib/moped/node.rb', line 29

attr_reader :address, :down_at, :latency, :options, :refreshed_at

#optionsHash

Returns The node options.

Returns:

  • (Hash)

    The node options.



29
# File 'lib/moped/node.rb', line 29

attr_reader :address, :down_at, :latency, :options, :refreshed_at

#refreshed_atObject

Since:

  • 1.0.0



29
# File 'lib/moped/node.rb', line 29

attr_reader :address, :down_at, :latency, :options, :refreshed_at

Instance Method Details

#==(other) ⇒ true, false Also known as: eql?

Is this node equal to another?

Examples:

Is the node equal to another.

node == other

Parameters:

  • other (Node)

    The other node.

Returns:

  • (true, false)

    If the addresses are equal.

Since:

  • 1.0.0



45
46
47
48
# File 'lib/moped/node.rb', line 45

def ==(other)
  return false unless other.is_a?(Node)
  address.resolved == other.address.resolved
end

#arbiter?true, false

Is the node an arbiter?

Examples:

Is the node an arbiter?

node.arbiter?

Returns:

  • (true, false)

    If the node is an arbiter.

Since:

  • 1.0.0



59
60
61
# File 'lib/moped/node.rb', line 59

def arbiter?
  !!@arbiter
end

#auto_discovering?true, false

Is the node auto-discovering new peers in the cluster?

Examples:

Is the node auto discovering?

node.auto_discovering?

Returns:

  • (true, false)

    If the node is auto discovering.

Since:

  • 2.0.0



71
72
73
# File 'lib/moped/node.rb', line 71

def auto_discovering?
  @auto_discovering ||= options[:auto_discover].nil? ? true : options[:auto_discover]
end

#command(database, cmd, options = {}) ⇒ Hash

Execute a command against a database.

Examples:

Execute a command.

node.command(database, { ping: 1 })

Parameters:

  • database (Database)

    The database to run the command on.

  • cmd (Hash)

    The command to execute.

Returns:

  • (Hash)

    The result of the command.

Raises:

  • (OperationFailure)

    If the command failed.

Since:

  • 1.0.0



89
90
91
# File 'lib/moped/node.rb', line 89

def command(database, cmd, options = {})
  read(Protocol::Command.new(database, cmd, options))
end

#connected?true, false

Is the node currently connected?

Examples:

Is the node connected?

node.connected?

Returns:

  • (true, false)

    If the node is connected or not.

Since:

  • 2.0.0



101
102
103
# File 'lib/moped/node.rb', line 101

def connected?
  connection{ |conn| conn.connected? }
end

#connectionConnection

Get the underlying connection for the node.

Examples:

Get the node’s connection.

node.connection

Returns:

Since:

  • 2.0.0



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/moped/node.rb', line 113

def connection
  connection_acquired = false
  begin
    pool.with do |conn|
      connection_acquired = true
      yield(conn)
    end
  rescue Timeout::Error, ConnectionPool::PoolShuttingDownError => e
    if e.kind_of?(ConnectionPool::PoolShuttingDownError)
      @pool = nil
      Connection::Manager.delete_pool(self)
      raise Errors::PoolTimeout.new(e)
    end
    raise connection_acquired ? e : Errors::PoolTimeout.new(e)
  end
end

#disconnecttrue

Force the node to disconnect from the server.

Examples:

Disconnect the node.

node.disconnect

Returns:

  • (true)

    If the disconnection succeeded.

Since:

  • 1.2.0



150
151
152
153
# File 'lib/moped/node.rb', line 150

def disconnect
  connection{ |conn| conn.disconnect }
  true
end

#down!nil

Mark the node as down.

Examples:

Mark the node as down.

node.down!

Returns:

  • (nil)

    Nothing.

Since:

  • 2.0.0



163
164
165
166
167
168
# File 'lib/moped/node.rb', line 163

def down!
  @down_at = Time.new
  @pool = nil
  @latency = nil
  Connection::Manager.shutdown(self)
end

#down?Time?

Is the node down?

Examples:

Is the node down?

node.down?

Returns:

  • (Time, nil)

    The time the node went down, or nil if up.

Since:

  • 1.0.0



138
139
140
# File 'lib/moped/node.rb', line 138

def down?
  @down_at
end

#ensure_connected(&block) ⇒ nil

Yields the block if a connection can be established, retrying when a connection error is raised.

Examples:

Ensure we are connection.

node.ensure_connected do
  #...
end

Returns:

  • (nil)

    nil.

Since:

  • 1.0.0



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/moped/node.rb', line 183

def ensure_connected(&block)
  unless (conn = stack(:connection)).empty?
    return yield(conn.first)
  end

  begin
    connection do |conn|
      connect(conn) unless conn.alive?
      conn.apply_credentials(@credentials)
      stack(:connection) << conn
      yield(conn)
    end
  rescue Exception => e
    if e.kind_of?(ConnectionPool::PoolShuttingDownError)
      @pool = nil
      Connection::Manager.delete_pool(self)
    end
    Failover.get(e).execute(e, self, &block)
  ensure
    end_execution(:connection)
  end
end

#ensure_primarynil

Set a flag on the node for the duration of provided block so that an exception is raised if the node is no longer the primary node.

Examples:

Ensure this node is primary.

node.ensure_primary do
  node.command(ismaster: 1)
end

Returns:

  • (nil)

    nil.

Since:

  • 1.0.0s



217
218
219
220
221
# File 'lib/moped/node.rb', line 217

def ensure_primary
  execute(:ensure_primary) do
    yield(self)
  end
end

#get_more(database, collection, cursor_id, limit) ⇒ Message

Execute a get more operation on the node.

Examples:

Execute a get more.

node.get_more(database, collection, 12345, -1)

Parameters:

  • database (Database)

    The database to get more from.

  • collection (Collection)

    The collection to get more from.

  • cursor_id (Integer)

    The id of the cursor on the server.

  • limit (Integer)

    The number of documents to limit.

Returns:

  • (Message)

    The result of the operation.

Raises:

  • (CursorNotFound)

    if the cursor has been killed

Since:

  • 1.0.0



237
238
239
# File 'lib/moped/node.rb', line 237

def get_more(database, collection, cursor_id, limit)
  read(Protocol::GetMore.new(database, collection, cursor_id, limit))
end

#hashInteger

Get the hash identifier for the node.

Examples:

Get the hash identifier.

node.hash

Returns:

  • (Integer)

    The hash identifier.

Since:

  • 1.0.0



249
250
251
# File 'lib/moped/node.rb', line 249

def hash
  address.resolved.hash
end

#insert(database, collection, documents, concern, options = {}) ⇒ Message

Insert documents into the database.

Examples:

Insert documents.

node.insert(database, collection, [{ name: "Tool" }])

Parameters:

  • database (Database)

    The database to insert to.

  • collection (Collection)

    The collection to insert to.

  • documents (Array<Hash>)

    The documents to insert.

Returns:

  • (Message)

    The result of the operation.

Since:

  • 1.0.0



287
288
289
# File 'lib/moped/node.rb', line 287

def insert(database, collection, documents, concern, options = {})
  write(Protocol::Insert.new(database, collection, documents, options), concern)
end

#inspectString

Get the node as a nice formatted string.

Examples:

Inspect the node.

node.inspect

Returns:

  • (String)

    The string inspection.

Since:

  • 1.0.0



528
529
530
# File 'lib/moped/node.rb', line 528

def inspect
  "<#{self.class.name} resolved_address=#{address.resolved.inspect}>"
end

#kill_cursors(cursor_ids) ⇒ Message

Kill all provided cursors on the node.

Examples:

Kill all the provided cursors.

node.kill_cursors([ 12345 ])

Parameters:

  • cursor_ids (Array<Integer>)

    The cursor ids.

Returns:

  • (Message)

    The result of the operation.

Since:

  • 1.0.0



301
302
303
# File 'lib/moped/node.rb', line 301

def kill_cursors(cursor_ids)
  process(Protocol::KillCursors.new(cursor_ids))
end

#messagable?true, false

Can we send messages to this node in normal cirucmstances? This is true only if the node is a primary or secondary node - arbiters or passives cannot be sent anything.

Examples:

Is the node messagable?

node.messagable?

Returns:

  • (true, false)

    If messages can be sent to the node.

Since:

  • 2.0.0



315
316
317
# File 'lib/moped/node.rb', line 315

def messagable?
  primary? || secondary?
end

#needs_refresh?(time) ⇒ true, false

Does the node need to be refreshed?

Examples:

Does the node require refreshing?

node.needs_refresh?(time)

Parameters:

  • time (Time)

    The next referesh time.

Returns:

  • (true, false)

    Whether the node needs to be refreshed.

Since:

  • 1.0.0



329
330
331
# File 'lib/moped/node.rb', line 329

def needs_refresh?(time)
  !refreshed_at || refreshed_at < time
end

#passive?true, false

Is the node passive?

Examples:

Is the node passive?

node.passive?

Returns:

  • (true, false)

    If the node is passive.

Since:

  • 1.0.0



341
342
343
# File 'lib/moped/node.rb', line 341

def passive?
  !!@passive
end

#peersArray<Node>

Get all the other nodes in the replica set according to the server information.

Examples:

Get the node’s peers.

node.peers

Returns:

  • (Array<Node>)

    The peers.

Since:

  • 2.0.0



354
355
356
# File 'lib/moped/node.rb', line 354

def peers
  @peers ||= []
end

#pipelinenil

Execute a pipeline of commands, for example a safe mode persist.

@todo: Remove with piggbacked gle.

Examples:

Execute a pipeline.

node.pipeline do
  #...
end

Returns:

  • (nil)

    nil.

Since:

  • 1.0.0



369
370
371
372
373
374
# File 'lib/moped/node.rb', line 369

def pipeline
  execute(:pipeline) do
    yield(self)
  end
  flush unless executing?(:pipeline)
end

#primary?true, false

Is the node the replica set primary?

Examples:

Is the node the primary?

node.primary?

Returns:

  • (true, false)

    If the node is the primary.

Since:

  • 1.0.0



384
385
386
# File 'lib/moped/node.rb', line 384

def primary?
  !!@primary
end

#process(operation, &callback) ⇒ Object

Processes the provided operation on this node, and will execute the callback when the operation is sent to the database.

Examples:

Process a read operation.

node.process(query) do |reply|
  return reply.documents
end

Parameters:

  • operation (Message)

    The database operation.

  • callback (Proc)

    The callback to run on operation completion.

Returns:

  • (Object)

    The result of the callback.

Since:

  • 1.0.0



402
403
404
405
406
407
408
# File 'lib/moped/node.rb', line 402

def process(operation, &callback)
  if executing?(:pipeline)
    queue.push([ operation, callback ])
  else
    flush([[ operation, callback ]])
  end
end

#query(database, collection, selector, options = {}) ⇒ Message

Execute a query on the node.

Examples:

Execute a query.

node.query(database, collection, { name: "Tool" })

Parameters:

  • database (Database)

    The database to query from.

  • collection (Collection)

    The collection to query from.

  • selector (Hash)

    The query selector.

  • options (Hash) (defaults to: {})

    The query options.

Returns:

  • (Message)

    The result of the operation.

Raises:

  • (QueryFailure)

    If the query had an error.

Since:

  • 1.0.0



425
426
427
# File 'lib/moped/node.rb', line 425

def query(database, collection, selector, options = {})
  read(Protocol::Query.new(database, collection, selector, options))
end

#refreshnil

Refresh information about the node, such as it’s status in the replica set and it’s known peers.

Examples:

Refresh the node.

node.refresh

Returns:

  • (nil)

    nil.

Raises:

  • (ConnectionFailure)

    If the node cannot be reached.

  • (ReplicaSetReconfigured)

    If the node is no longer a primary node and refresh was called within an #ensure_primary block.

Since:

  • 1.0.0



443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/moped/node.rb', line 443

def refresh
  if address.resolve(self)
    begin
      @refreshed_at = Time.now
      configure(command("admin", ismaster: 1))
      if !primary? && executing?(:ensure_primary)
        raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {})
      elsif !messagable?
        # not primary or secondary so mark it as down, since it's probably
        # a recovering node withing the replica set
        down!
      end
    rescue Timeout::Error
      down!
    end
  end
end

#remove(database, collection, selector, concern, options = {}) ⇒ Message

Execute a remove command for the provided selector.

Examples:

Remove documents.

node.remove(database, collection, { name: "Tool" })

Parameters:

  • database (Database)

    The database to remove from.

  • collection (Collection)

    The collection to remove from.

  • selector (Hash)

    The query selector.

  • options (Hash) (defaults to: {})

    The remove options.

Returns:

  • (Message)

    The result of the operation.

Since:

  • 1.0.0



474
475
476
# File 'lib/moped/node.rb', line 474

def remove(database, collection, selector, concern, options = {})
  write(Protocol::Delete.new(database, collection, selector, options), concern)
end

#secondary?true, false

Is the node a replica set secondary?

Examples:

Is the node a secondary?

node.secondary?

Returns:

  • (true, false)

    If the node is a secondary.

Since:

  • 1.0.0



486
487
488
# File 'lib/moped/node.rb', line 486

def secondary?
  @secondary
end

#timeoutInteger

Get the timeout, in seconds, for this node.

Examples:

Get the timeout in seconds.

node.timeout

Returns:

  • (Integer)

    The configured timeout or the default of 5.

Since:

  • 1.0.0



498
499
500
# File 'lib/moped/node.rb', line 498

def timeout
  @timeout ||= (options[:timeout] || 5)
end

#update(database, collection, selector, change, concern, options = {}) ⇒ Message

Execute an update command for the provided selector.

Examples:

Update documents.

node.update(database, collection, { name: "Tool" }, { likes: 1000 })

Parameters:

  • database (Database)

    The database to update.

  • collection (Collection)

    The collection to update.

  • selector (Hash)

    The query selector.

  • change (Hash)

    The updates.

  • options (Hash) (defaults to: {})

    The update options.

Returns:

  • (Message)

    The result of the operation.

Since:

  • 1.0.0



516
517
518
# File 'lib/moped/node.rb', line 516

def update(database, collection, selector, change, concern, options = {})
  write(Protocol::Update.new(database, collection, selector, change, options), concern)
end