Class: EventStoreClient::GRPC::Commands::Streams::Subscribe

Inherits:
Command
  • Object
show all
Defined in:
lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb

Instance Method Summary collapse

Methods inherited from Command

#connection_options, #metadata, #request, #service, use_request, use_service

Constructor Details

#initialize(**conn_options) ⇒ Subscribe

Returns a new instance of Subscribe.



11
12
13
14
# File 'lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb', line 11

def initialize(**conn_options)
  # Subscriptions should never be timed out
  super(**conn_options.merge(timeout: nil))
end

Instance Method Details

#call(stream_name, handler:, options:, skip_deserialization:, skip_decryption:) {|options| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields:

  • (options)

See Also:

  • EventStoreClient::GRPC::Commands::Streams::Subscribe.{EventStoreClient{EventStoreClient::GRPC{EventStoreClient::GRPC::Client{EventStoreClient::GRPC::Client#read}


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb', line 18

def call(stream_name, handler:, options:, skip_deserialization:, skip_decryption:)
  options = normalize_options(stream_name, options)
  yield options if block_given?

  callback = proc do |response|
    result = Shared::Streams::ProcessResponse.new(config: config).call(
      response,
      skip_deserialization,
      skip_decryption
    )

    handler.call(result) if result
  end
  retry_request do
    service.read(request.new(options: options), metadata: , &callback)
  end
end