Class: EventStoreSubscriptions::SubscriptionPosition

Inherits:
Struct
  • Object
show all
Defined in:
lib/event_store_subscriptions/subscription_position.rb

Overview

This class is used to persist and update commit_position and prepare_position when subscribing to the “$all” stream.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSubscriptionPosition

Returns a new instance of SubscriptionPosition.



9
10
11
12
# File 'lib/event_store_subscriptions/subscription_position.rb', line 9

def initialize(*)
  super
  @update_hooks = []
end

Instance Attribute Details

#commit_positionObject

Returns the value of attribute commit_position

Returns:

  • (Object)

    the current value of commit_position



6
7
8
# File 'lib/event_store_subscriptions/subscription_position.rb', line 6

def commit_position
  @commit_position
end

#prepare_positionObject

Returns the value of attribute prepare_position

Returns:

  • (Object)

    the current value of prepare_position



6
7
8
# File 'lib/event_store_subscriptions/subscription_position.rb', line 6

def prepare_position
  @prepare_position
end

#update_hooksObject (readonly)

Returns the value of attribute update_hooks.



7
8
9
# File 'lib/event_store_subscriptions/subscription_position.rb', line 7

def update_hooks
  @update_hooks
end

Instance Method Details

#empty?Boolean

Checks if position’s properties are absent

Returns:

  • (Boolean)


52
53
54
# File 'lib/event_store_subscriptions/subscription_position.rb', line 52

def empty?
  commit_position.nil? || prepare_position.nil?
end

#present?Boolean

Checks if position’s properties are present

Returns:

  • (Boolean)


58
59
60
# File 'lib/event_store_subscriptions/subscription_position.rb', line 58

def present?
  !empty?
end

#register_update_hook(&blk) ⇒ void

This method returns an undefined value.

Adds a handler that will be executed when the position gets updates. You may add as many handlers as you want. Example:

```ruby
subscription_position.register_update_hook do |position|
  # do something with the position. E.g. persist it somewhere
end
subscription_position.register_update_hook do |position|
  # do something else with the position
end
```


46
47
48
# File 'lib/event_store_subscriptions/subscription_position.rb', line 46

def register_update_hook(&blk)
  update_hooks << blk
end

#to_optionHash

Constructs a hash compatible for usage with EventStoreClient::GRPC::Client#subscribe_to_stream method. You can pass it into :options keyword argument of that method to set the starting position of the stream.

Returns:

  • (Hash)


66
67
68
# File 'lib/event_store_subscriptions/subscription_position.rb', line 66

def to_option
  { from_position: { commit_position: commit_position, prepare_position: prepare_position } }
end

#update(response) ⇒ Boolean

Updates the position from the GRPC response.

Parameters:

  • response (EventStore::Client::Streams::ReadResp)

    GRPC EventStore object. See its structure in the lib/event_store_client/adapters/grpc/generated/streams_pb.rb file in ‘event_store_client` gem.

Returns:

  • (Boolean)

    whether the position was updated



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/event_store_subscriptions/subscription_position.rb', line 19

def update(response)
  source = response.event&.event
  return false unless source

  # Updating position values in memory first to prevent the situation when the update hook fails,
  # and the position is not up to date.
  self.commit_position, self.prepare_position =
    source.commit_position, source.prepare_position

  update_hooks.each do |handler|
    handler.call(self)
  end
  true
end