Class: Mongo::Collection::View::ChangeStream

Inherits:
Aggregation
  • Object
show all
Includes:
Aggregation::Behavior, Retryable
Defined in:
lib/mongo/collection/view/change_stream.rb,
lib/mongo/collection/view/change_stream/retryable.rb

Overview

Note:

Only available in server versions 3.6 and higher.

Note:

ChangeStreams do not work properly with JRuby because of the issue documented here: github.com/jruby/jruby/issues/4212. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.

Provides behavior around a ‘$changeStream` pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.

Since:

  • 2.5.0

Defined Under Namespace

Modules: Retryable

Constant Summary collapse

FULL_DOCUMENT_DEFAULT =

Returns The fullDocument option default value.

Returns:

  • (String)

    The fullDocument option default value.

Since:

  • 2.5.0

'default'.freeze
DATABASE =

Returns Used to indicate that the change stream should listen for changes on the entire database rather than just the collection.

Returns:

  • (Symbol)

    Used to indicate that the change stream should listen for changes on the entire database rather than just the collection.

Since:

  • 2.6.0

:database
CLUSTER =

Returns Used to indicate that the change stream should listen for changes on the entire cluster rather than just the collection.

Returns:

  • (Symbol)

    Used to indicate that the change stream should listen for changes on the entire cluster rather than just the collection.

Since:

  • 2.6.0

:cluster

Constants included from Loggable

Loggable::PREFIX

Constants included from Explainable

Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER

Instance Attribute Summary collapse

Attributes included from Aggregation::Behavior

#view

Instance Method Summary collapse

Methods included from Retryable

#read_worker, #select_server, #write_worker

Methods included from Aggregation::Behavior

#allow_disk_use, #explain, #timeout_ms, #write?

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Methods included from Explainable

#explain

Methods included from Iterable

#close_query

Methods included from Mongo::CursorHost

#validate_timeout_mode!

Constructor Details

#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream

Initialize the change stream for the provided collection view, pipeline and options.

Examples:

Create the new change stream view.

ChangeStream.new(view, pipeline, options)

Parameters:

  • view (Collection::View)

    The collection view.

  • pipeline (Array<Hash>)

    The pipeline of operators to filter the change notifications.

  • options (Hash) (defaults to: {})

    The change stream options.

Options Hash (options):

  • :full_document (String)

    Allowed values: nil, ‘default’, ‘updateLookup’, ‘whenAvailable’, ‘required’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘default’. By default, the change notification for partial updates will include a delta describing the changes to the document.

    When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some time after the change occurred.

    When set to ‘whenAvailable’, configures the change stream to return the post-image of the modified document for replace and update change events if the post-image for this event is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the post-image is not available.

  • :full_document_before_change (String)

    Allowed values: nil, ‘whenAvailable’, ‘required’, ‘off’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘off’.

    When set to ‘whenAvailable’, configures the change stream to return the pre-image of the modified document for replace, update, and delete change events if it is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the pre-image is not available.

  • :resume_after (BSON::Document, Hash)

    Specifies the logical starting point for the new change stream.

  • :max_await_time_ms (Integer)

    The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :collation (BSON::Document, Hash)

    The collation to use.

  • :start_at_operation_time (BSON::Timestamp)

    Only return changes that occurred at or after the specified timestamp. Any command run against the server will return a cluster time that can be used here. Only recognized by server versions 4.0+.

  • :start_after (Bson::Document, Hash)

    Similar to :resume_after, this option takes a resume token and starts a new change stream returning the first notification after the token. This will allow users to watch collections that have been dropped and recreated or newly renamed collections without missing any notifications.

  • :comment (Object)

    A user-provided comment to attach to this command.

  • :show_expanded_events (Boolean)

    Enables the server to send the ‘expanded’ list of change stream events. The list of additional events included with this flag set are: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection, refineCollectionShardKey.

    The server will report an error if ‘startAfter` and `resumeAfter` are both specified.

Since:

  • 2.5.0



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/mongo/collection/view/change_stream.rb', line 133

def initialize(view, pipeline, changes_for, options = {})
  # change stream cursors can only be :iterable, so we don't allow
  # timeout_mode to be specified.
  perform_setup(view, options, forbid: %i[ timeout_mode ]) do
    @changes_for = changes_for
    @change_stream_filters = pipeline && pipeline.dup
    @start_after = @options[:start_after]
  end

  # The resume token tracked by the change stream, used only
  # when there is no cursor, or no cursor resume token
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # We send different parameters when we resume a change stream
  # compared to when we send the first query
  @resuming = true
end

Instance Attribute Details

#cursorCursor (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the underlying cursor for this operation.

Returns:

  • (Cursor)

    the underlying cursor for this operation

Since:

  • 2.5.0



67
68
69
# File 'lib/mongo/collection/view/change_stream.rb', line 67

def cursor
  @cursor
end

#optionsBSON::Document (readonly)

Returns The change stream options.

Returns:

  • (BSON::Document)

    The change stream options.

Since:

  • 2.5.0



63
64
65
# File 'lib/mongo/collection/view/change_stream.rb', line 63

def options
  @options
end

Instance Method Details

#close(opts = {}) ⇒ nil

Note:

This method attempts to close the cursor used by the change stream, which in turn closes the server-side change stream cursor. This method ignores any errors that occur when closing the server-side cursor.

Close the change stream.

Examples:

Close the change stream.

stream.close

Returns:

  • (nil)

    Always nil.

Since:

  • 2.5.0



254
255
256
257
258
259
260
261
262
263
# File 'lib/mongo/collection/view/change_stream.rb', line 254

def close(opts = {})
  unless closed?
    begin
      @cursor.close(opts)
    rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
      # ignore
    end
    @cursor = nil
  end
end

#closed?true, false

Is the change stream closed?

Examples:

Determine whether the change stream is closed.

stream.closed?

Returns:

  • (true, false)

    If the change stream is closed.

Since:

  • 2.5.0



273
274
275
# File 'lib/mongo/collection/view/change_stream.rb', line 273

def closed?
  @cursor.nil?
end

#cursor_typeObject

“change streams are an abstraction around tailable-awaitData cursors…”

Returns:

  • :tailable_await

Since:

  • 2.5.0



307
308
309
# File 'lib/mongo/collection/view/change_stream.rb', line 307

def cursor_type
  :tailable_await
end

#each {|Each| ... } ⇒ Enumerator

Iterate through documents returned by the change stream.

This method retries once per error on resumable errors (two consecutive errors result in the second error being raised, an error which is recovered from resets the error count to zero).

Examples:

Iterate through the stream of documents.

stream.each do |document|
  p document
end

Yield Parameters:

  • Each (BSON::Document)

    change stream document.

Returns:

  • (Enumerator)

    The enumerator.

Since:

  • 2.5.0



169
170
171
172
173
174
175
176
177
# File 'lib/mongo/collection/view/change_stream.rb', line 169

def each
  raise StopIteration.new if closed?
  loop do
    document = try_next
    yield document if document
  end
rescue StopIteration
  return self
end

#inspectString

Get a formatted string for use in inspection.

Examples:

Inspect the change stream object.

stream.inspect

Returns:

  • (String)

    The change stream inspection.

Since:

  • 2.5.0



285
286
287
288
# File 'lib/mongo/collection/view/change_stream.rb', line 285

def inspect
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
    "options=#{@options} resume_token=#{resume_token}>"
end

#max_await_time_msInteger | nil

Returns the value of the max_await_time_ms option that was passed to this change stream.

Returns:

  • (Integer | nil)

    the max_await_time_ms value

Since:

  • 2.5.0



322
323
324
# File 'lib/mongo/collection/view/change_stream.rb', line 322

def max_await_time_ms
  options[:max_await_time_ms]
end

#resume_tokenBSON::Document | nil

Returns the resume token that the stream will use to automatically resume, if one exists.

Examples:

Get the change stream resume token.

stream.resume_token

Returns:

  • (BSON::Document | nil)

    The change stream resume token.

Since:

  • 2.10.0



299
300
301
302
# File 'lib/mongo/collection/view/change_stream.rb', line 299

def resume_token
  cursor_resume_token = @cursor.resume_token if @cursor
  cursor_resume_token || @resume_token
end

#timeout_modeObject

“change streams…implicitly use ITERATION mode”

Returns:

  • :iteration

Since:

  • 2.5.0



314
315
316
# File 'lib/mongo/collection/view/change_stream.rb', line 314

def timeout_mode
  :iteration
end

#to_enumObject

Since:

  • 2.5.0



227
228
229
230
231
232
233
234
235
236
# File 'lib/mongo/collection/view/change_stream.rb', line 227

def to_enum
  enum = super
  enum.send(:instance_variable_set, '@obj', self)
  class << enum
    def try_next
      @obj.try_next
    end
  end
  enum
end

#try_nextBSON::Document | nil

Return one document from the change stream, if one is available.

Retries once on a resumable error.

Raises StopIteration if the change stream is closed.

This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil.

Returns:

  • (BSON::Document | nil)

    A change stream document.

Raises:

  • (StopIteration)

Since:

  • 2.6.0



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/mongo/collection/view/change_stream.rb', line 191

def try_next
  recreate_cursor! if @timed_out

  raise StopIteration.new if closed?

  begin
    doc = @cursor.try_next
  rescue Mongo::Error => e
    # "If a next call fails with a timeout error, drivers MUST NOT
    # invalidate the change stream. The subsequent next call MUST
    # perform a resume attempt to establish a new change stream on the
    # server..."
    #
    # However, SocketTimeoutErrors are TimeoutErrors, but are also
    # change-stream-resumable. To preserve existing (specified) behavior,
    # We only count timeouts when the error is not also
    # change-stream-resumable.
    @timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable?

    raise unless @timed_out || e.change_stream_resumable?

    @resume_token = @cursor.resume_token
    raise e if @timed_out

    recreate_cursor!(@cursor.context)
    retry
  end

  # We need to verify each doc has an _id, so we
  # have a resume token to work with
  if doc && doc['_id'].nil?
    raise Error::MissingResumeToken
  end
  doc
end