Class: Mongo::Collection::View::ChangeStream
- Inherits:
-
Aggregation
- Object
- Aggregation
- Mongo::Collection::View::ChangeStream
- Includes:
- Aggregation::Behavior, Retryable
- Defined in:
- lib/mongo/collection/view/change_stream.rb,
lib/mongo/collection/view/change_stream/retryable.rb
Overview
Only available in server versions 3.6 and higher.
ChangeStreams do not work properly with JRuby because of the issue documented here: github.com/jruby/jruby/issues/4212. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.
Provides behavior around a ‘$changeStream` pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.
Defined Under Namespace
Modules: Retryable
Constant Summary collapse
- FULL_DOCUMENT_DEFAULT =
Returns The fullDocument option default value.
'default'.freeze
- DATABASE =
Returns Used to indicate that the change stream should listen for changes on the entire database rather than just the collection.
:database
- CLUSTER =
Returns Used to indicate that the change stream should listen for changes on the entire cluster rather than just the collection.
:cluster
Constants included from Loggable
Constants included from Explainable
Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER
Instance Attribute Summary collapse
-
#cursor ⇒ Cursor
readonly
private
The underlying cursor for this operation.
-
#options ⇒ BSON::Document
readonly
The change stream options.
Attributes included from Aggregation::Behavior
Instance Method Summary collapse
-
#close(opts = {}) ⇒ nil
Close the change stream.
-
#closed? ⇒ true, false
Is the change stream closed?.
-
#cursor_type ⇒ Object
“change streams are an abstraction around tailable-awaitData cursors…”.
-
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the change stream.
-
#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
constructor
Initialize the change stream for the provided collection view, pipeline and options.
-
#inspect ⇒ String
Get a formatted string for use in inspection.
-
#max_await_time_ms ⇒ Integer | nil
Returns the value of the max_await_time_ms option that was passed to this change stream.
-
#resume_token ⇒ BSON::Document | nil
Returns the resume token that the stream will use to automatically resume, if one exists.
-
#timeout_mode ⇒ Object
“change streams…implicitly use ITERATION mode”.
- #to_enum ⇒ Object
-
#try_next ⇒ BSON::Document | nil
Return one document from the change stream, if one is available.
Methods included from Retryable
#read_worker, #select_server, #write_worker
Methods included from Aggregation::Behavior
#allow_disk_use, #explain, #timeout_ms, #write?
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Methods included from Explainable
Methods included from Iterable
Methods included from Mongo::CursorHost
Constructor Details
#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
Initialize the change stream for the provided collection view, pipeline and options.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/mongo/collection/view/change_stream.rb', line 133 def initialize(view, pipeline, changes_for, = {}) # change stream cursors can only be :iterable, so we don't allow # timeout_mode to be specified. perform_setup(view, , forbid: %i[ timeout_mode ]) do @changes_for = changes_for @change_stream_filters = pipeline && pipeline.dup @start_after = @options[:start_after] end # The resume token tracked by the change stream, used only # when there is no cursor, or no cursor resume token @resume_token = @start_after || @options[:resume_after] create_cursor! # We send different parameters when we resume a change stream # compared to when we send the first query @resuming = true end |
Instance Attribute Details
#cursor ⇒ Cursor (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the underlying cursor for this operation.
67 68 69 |
# File 'lib/mongo/collection/view/change_stream.rb', line 67 def cursor @cursor end |
#options ⇒ BSON::Document (readonly)
Returns The change stream options.
63 64 65 |
# File 'lib/mongo/collection/view/change_stream.rb', line 63 def @options end |
Instance Method Details
#close(opts = {}) ⇒ nil
This method attempts to close the cursor used by the change stream, which in turn closes the server-side change stream cursor. This method ignores any errors that occur when closing the server-side cursor.
Close the change stream.
254 255 256 257 258 259 260 261 262 263 |
# File 'lib/mongo/collection/view/change_stream.rb', line 254 def close(opts = {}) unless closed? begin @cursor.close(opts) rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection # ignore end @cursor = nil end end |
#closed? ⇒ true, false
Is the change stream closed?
273 274 275 |
# File 'lib/mongo/collection/view/change_stream.rb', line 273 def closed? @cursor.nil? end |
#cursor_type ⇒ Object
“change streams are an abstraction around tailable-awaitData cursors…”
307 308 309 |
# File 'lib/mongo/collection/view/change_stream.rb', line 307 def cursor_type :tailable_await end |
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the change stream.
This method retries once per error on resumable errors (two consecutive errors result in the second error being raised, an error which is recovered from resets the error count to zero).
169 170 171 172 173 174 175 176 177 |
# File 'lib/mongo/collection/view/change_stream.rb', line 169 def each raise StopIteration.new if closed? loop do document = try_next yield document if document end rescue StopIteration return self end |
#inspect ⇒ String
Get a formatted string for use in inspection.
285 286 287 288 |
# File 'lib/mongo/collection/view/change_stream.rb', line 285 def inspect "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " + "options=#{@options} resume_token=#{resume_token}>" end |
#max_await_time_ms ⇒ Integer | nil
Returns the value of the max_await_time_ms option that was passed to this change stream.
322 323 324 |
# File 'lib/mongo/collection/view/change_stream.rb', line 322 def max_await_time_ms [:max_await_time_ms] end |
#resume_token ⇒ BSON::Document | nil
Returns the resume token that the stream will use to automatically resume, if one exists.
299 300 301 302 |
# File 'lib/mongo/collection/view/change_stream.rb', line 299 def resume_token cursor_resume_token = @cursor.resume_token if @cursor cursor_resume_token || @resume_token end |
#timeout_mode ⇒ Object
“change streams…implicitly use ITERATION mode”
314 315 316 |
# File 'lib/mongo/collection/view/change_stream.rb', line 314 def timeout_mode :iteration end |
#to_enum ⇒ Object
227 228 229 230 231 232 233 234 235 236 |
# File 'lib/mongo/collection/view/change_stream.rb', line 227 def to_enum enum = super enum.send(:instance_variable_set, '@obj', self) class << enum def try_next @obj.try_next end end enum end |
#try_next ⇒ BSON::Document | nil
Return one document from the change stream, if one is available.
Retries once on a resumable error.
Raises StopIteration if the change stream is closed.
This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/mongo/collection/view/change_stream.rb', line 191 def try_next recreate_cursor! if @timed_out raise StopIteration.new if closed? begin doc = @cursor.try_next rescue Mongo::Error => e # "If a next call fails with a timeout error, drivers MUST NOT # invalidate the change stream. The subsequent next call MUST # perform a resume attempt to establish a new change stream on the # server..." # # However, SocketTimeoutErrors are TimeoutErrors, but are also # change-stream-resumable. To preserve existing (specified) behavior, # We only count timeouts when the error is not also # change-stream-resumable. @timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable? raise unless @timed_out || e.change_stream_resumable? @resume_token = @cursor.resume_token raise e if @timed_out recreate_cursor!(@cursor.context) retry end # We need to verify each doc has an _id, so we # have a resume token to work with if doc && doc['_id'].nil? raise Error::MissingResumeToken end doc end |