Class: Mongo::Cluster::CursorReaper Private
- Inherits:
-
Object
- Object
- Mongo::Cluster::CursorReaper
- Includes:
- Retryable
- Defined in:
- lib/mongo/cluster/reapers/cursor_reaper.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
A manager that sends kill cursors operations at regular intervals to close cursors that have been garbage collected without being exhausted.
Constant Summary collapse
- FREQUENCY =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
The default time interval for the cursor reaper to send pending kill cursors operations.
1.freeze
Instance Attribute Summary collapse
- #cluster ⇒ Object readonly private
Instance Method Summary collapse
-
#initialize(cluster) ⇒ CursorReaper
constructor
private
Create a cursor reaper.
-
#kill_cursors ⇒ Object
(also: #execute, #flush)
private
Execute all pending kill cursors operations.
-
#read_scheduled_kill_specs ⇒ Object
private
Read and decode scheduled kill cursors operations.
-
#register_cursor(id) ⇒ Object
private
Register a cursor id as active.
-
#schedule_kill_cursor(kill_spec) ⇒ Object
private
Schedule a kill cursors operation to be eventually executed.
-
#unregister_cursor(id) ⇒ Object
private
Unregister a cursor id, indicating that it’s no longer active.
Methods included from Retryable
#read_worker, #select_server, #write_worker
Constructor Details
#initialize(cluster) ⇒ CursorReaper
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a cursor reaper.
42 43 44 45 46 47 48 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 42 def initialize(cluster) @cluster = cluster @to_kill = {} @active_cursor_ids = Set.new @mutex = Mutex.new @kill_spec_queue = Queue.new end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
50 51 52 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 50 def cluster @cluster end |
Instance Method Details
#kill_cursors ⇒ Object Also known as: execute, flush
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Execute all pending kill cursors operations.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 133 def kill_cursors # TODO optimize this to batch kill cursor operations for the same # server/database/collection instead of killing each cursor # individually. loop do server_address = nil kill_spec = @mutex.synchronize do read_scheduled_kill_specs # Find a server that has any cursors scheduled for destruction. server_address, specs = @to_kill.detect { |_, specs| specs.any? } if specs.nil? # All servers have empty specs, nothing to do. return end # Note that this mutates the spec in the queue. # If the kill cursor operation fails, we don't attempt to # kill that cursor again. spec = specs.take(1).tap do |arr| specs.subtract(arr) end.first unless @active_cursor_ids.include?(spec.cursor_id) # The cursor was already killed, typically because it has # been iterated to completion. Remove the kill spec from # our records without doing any more work. spec = nil end spec end # If there was a spec to kill but its cursor was already killed, # look for another spec. next unless kill_spec # We could also pass kill_spec directly into the KillCursors # operation, though this would make that operation have a # different API from all of the other ones which accept hashes. spec = { cursor_ids: [kill_spec.cursor_id], coll_name: kill_spec.coll_name, db_name: kill_spec.db_name, } op = Operation::KillCursors.new(spec) server = cluster.servers.detect do |server| server.address == server_address end unless server # TODO We currently don't have a server for the address that the # cursor is associated with. We should leave the cursor in the # queue to be killed at a later time (when the server comes back). next end = { server_api: server.[:server_api], connection_global_id: kill_spec.connection_global_id, } if connection = kill_spec.connection op.execute_with_connection(connection, context: Operation::Context.new(options: )) connection.connection_pool.check_in(connection) else op.execute(server, context: Operation::Context.new(options: )) end if session = kill_spec.session if session.implicit? session.end_session end end end end |
#read_scheduled_kill_specs ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Read and decode scheduled kill cursors operations.
This method mutates instance variables without locking, so is is not thread safe. Generally, it should not be called itself, this is a helper for ‘kill_cursor` method.
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 114 def read_scheduled_kill_specs while kill_spec = @kill_spec_queue.pop(true) if @active_cursor_ids.include?(kill_spec.cursor_id) @to_kill[kill_spec.server_address] ||= Set.new @to_kill[kill_spec.server_address] << kill_spec end end rescue ThreadError # Empty queue, nothing to do. end |
#register_cursor(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a cursor id as active.
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 71 def register_cursor(id) if id.nil? raise ArgumentError, 'register_cursor called with nil cursor_id' end if id == 0 raise ArgumentError, 'register_cursor called with cursor_id=0' end @mutex.synchronize do @active_cursor_ids << id end end |
#schedule_kill_cursor(kill_spec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedule a kill cursors operation to be eventually executed.
57 58 59 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 57 def schedule_kill_cursor(kill_spec) @kill_spec_queue << kill_spec end |
#unregister_cursor(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Unregister a cursor id, indicating that it’s no longer active.
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 94 def unregister_cursor(id) if id.nil? raise ArgumentError, 'unregister_cursor called with nil cursor_id' end if id == 0 raise ArgumentError, 'unregister_cursor called with cursor_id=0' end @mutex.synchronize do @active_cursor_ids.delete(id) end end |