Class: Mongo::Server::PushMonitor Private

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
BackgroundThread
Defined in:
lib/mongo/server/push_monitor.rb,
lib/mongo/server/push_monitor/connection.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

A monitor utilizing server-pushed hello requests.

When a Monitor handshakes with a 4.4+ server, it creates an instance of PushMonitor. PushMonitor subsequently executes server-pushed hello (i.e. awaited & exhausted hello) to receive topology changes from the server as quickly as possible. The Monitor still monitors the server for round-trip time calculations and to perform immediate checks as requested by the application.

Since:

  • 2.0.0

Defined Under Namespace

Classes: Connection

Constant Summary

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BackgroundThread

#run!, #running?

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Constructor Details

#initialize(monitor, topology_version, monitoring, **options) ⇒ PushMonitor

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of PushMonitor.

Since:

  • 2.0.0



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/mongo/server/push_monitor.rb', line 35

def initialize(monitor, topology_version, monitoring, **options)
  if topology_version.nil?
    raise ArgumentError, 'Topology version must be provided but it was nil'
  end
  unless options[:app_metadata]
    raise ArgumentError, 'App metadata is required'
  end
  unless options[:check_document]
    raise ArgumentError, 'Check document is required'
  end
  @app_metadata = options[:app_metadata]
  @check_document = options[:check_document]
  @monitor = monitor
  @topology_version = topology_version
  @monitoring = monitoring
  @options = options
  @lock = Mutex.new
end

Instance Attribute Details

#monitorServer (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The server that is being monitored.

Returns:

  • (Server)

    The server that is being monitored.

Since:

  • 2.0.0



55
56
57
# File 'lib/mongo/server/push_monitor.rb', line 55

def monitor
  @monitor
end

#monitoringMonitoring (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns monitoring The monitoring.

Returns:

Since:

  • 2.0.0



61
62
63
# File 'lib/mongo/server/push_monitor.rb', line 61

def monitoring
  @monitoring
end

#optionsHash (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Push monitor options.

Returns:

  • (Hash)

    Push monitor options.

Since:

  • 2.0.0



64
65
66
# File 'lib/mongo/server/push_monitor.rb', line 64

def options
  @options
end

#topology_versionTopologyVersion (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Most recently received topology version.

Returns:

Since:

  • 2.0.0



58
59
60
# File 'lib/mongo/server/push_monitor.rb', line 58

def topology_version
  @topology_version
end

Instance Method Details

#checkObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/mongo/server/push_monitor.rb', line 137

def check
  @lock.synchronize do
    if @connection && @connection.pid != Process.pid
      log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
      @connection.disconnect!
      @connection = nil
    end
  end

  @lock.synchronize do
    unless @connection
      @server_pushing = false
      connection = PushMonitor::Connection.new(server.address, options)
      connection.connect!
      @connection = connection
    end
  end

  resp_msg = begin
    unless @server_pushing
      write_check_command
    end
    read_response
  rescue Mongo::Error
    @lock.synchronize do
      @connection.disconnect!
      @connection = nil
    end
    raise
  end
  @server_pushing = resp_msg.flags.include?(:more_to_come)
  result = Operation::Result.new(resp_msg)
  result.validate!
  result.documents.first
end

#do_workObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/mongo/server/push_monitor.rb', line 94

def do_work
  @lock.synchronize do
    return if @stop_requested
  end

  result = monitoring.publish_heartbeat(server, awaited: true) do
    check
  end
  new_description = monitor.run_sdam_flow(result, awaited: true)
  # When hello fails due to a fail point, the response does not
  # include topology version. In this case we need to keep our existing
  # topology version so that we can resume monitoring.
  # The spec does not appear to directly address this case but
  # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-ismaster
  # says that topologyVersion should only be updated from successful
  # hello responses.
  if new_description.topology_version
    @topology_version = new_description.topology_version
  end
rescue IOError, SocketError, SystemCallError, Mongo::Error => exc
  stop_requested = @lock.synchronize { @stop_requested }
  if stop_requested
    # Ignore the exception, see RUBY-2771.
    return
  end

  msg = "Error running awaited hello on #{server.address}"
  Utils.warn_bg_exception(msg, exc,
    logger: options[:logger],
    log_prefix: options[:log_prefix],
    bg_error_backtrace: options[:bg_error_backtrace],
  )

  # If a request failed on a connection, stop push monitoring.
  # In case the server is dead we don't want to have two connections
  # trying to connect unsuccessfully at the same time.
  stop!

  # Request an immediate check on the monitor to get reinstated as
  # soon as possible in case the server is actually alive.
  server.scan_semaphore.signal
end

#read_responseObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/mongo/server/push_monitor.rb', line 184

def read_response
  if timeout = options[:connect_timeout]
    if timeout < 0
      raise Mongo::SocketTimeoutError, "Requested to read with a negative timeout: #{}"
    elsif timeout > 0
      timeout += options[:heartbeat_frequency] || Monitor::DEFAULT_HEARTBEAT_INTERVAL
    end
  end
  # We set the timeout twice: once passed into read_socket which applies
  # to each individual read operation, and again around the entire read.
  Timeout.timeout(timeout, Error::SocketTimeoutError, "Failed to read an awaited hello response in #{timeout} seconds") do
    @lock.synchronize { @connection }.read_response(socket_timeout: timeout)
  end
end

#start!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



69
70
71
72
73
# File 'lib/mongo/server/push_monitor.rb', line 69

def start!
  @lock.synchronize do
    super
  end
end

#stop!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/mongo/server/push_monitor.rb', line 75

def stop!
  @lock.synchronize do
    @stop_requested = true
    if @connection
      # Interrupt any in-progress exhausted hello reads by
      # disconnecting the connection.
      @connection.send(:socket).close rescue nil
    end
  end
  super.tap do
    @lock.synchronize do
      if @connection
        @connection.disconnect!
        @connection = nil
      end
    end
  end
end

#to_sObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



199
200
201
# File 'lib/mongo/server/push_monitor.rb', line 199

def to_s
  "#<#{self.class.name}:#{object_id} #{server.address}>"
end

#write_check_commandObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



173
174
175
176
177
178
179
180
181
182
# File 'lib/mongo/server/push_monitor.rb', line 173

def write_check_command
  document = @check_document.merge(
    topologyVersion: topology_version.to_doc,
    maxAwaitTimeMS: monitor.heartbeat_interval * 1000,
  )
  command = Protocol::Msg.new(
    [:exhaust_allowed], {}, document.merge({'$db' => Database::ADMIN})
  )
  @lock.synchronize { @connection }.write_bytes(command.serialize.to_s)
end