Class: RubyEventStore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/client.rb

Defined Under Namespace

Classes: Within

Instance Method Summary collapse

Constructor Details

#initialize(repository: InMemoryRepository.new, mapper: Mappers::Default.new, subscriptions: Subscriptions.new, dispatcher: Dispatcher.new, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/ruby_event_store/client.rb', line 7

def initialize(
  repository: InMemoryRepository.new,
  mapper: Mappers::Default.new,
  subscriptions: Subscriptions.new,
  dispatcher: Dispatcher.new,
  clock: default_clock,
  correlation_id_generator: default_correlation_id_generator,
  event_type_resolver: EventTypeResolver.new
)
  @repository = repository
  @mapper = mapper
  @subscriptions = subscriptions
  @broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
  @clock = clock
  @metadata = Concurrent::ThreadLocalVar.new
  @correlation_id_generator = correlation_id_generator
  @event_type_resolver = event_type_resolver
end

Instance Method Details

#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists new event(s) without notifying any subscribed handlers

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


48
49
50
51
52
53
54
55
# File 'lib/ruby_event_store/client.rb', line 48

def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  append_records_to_stream(
    transform((events)),
    stream_name: stream_name,
    expected_version: expected_version
  )
  self
end

#delete_stream(stream_name) ⇒ self

Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.

Parameters:

  • stream_name (String)

    name of the stream to be cleared.

Returns:

  • (self)


75
76
77
78
# File 'lib/ruby_event_store/client.rb', line 75

def delete_stream(stream_name)
  repository.delete_stream(Stream.new(stream_name))
  self
end

#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event

Deserialize event which was serialized for async event handlers Read more

Returns:

  • (Event)

    deserialized event



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/ruby_event_store/client.rb', line 273

def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
  extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 }

  mapper.record_to_event(
    SerializedRecord
      .new(
        event_type: event_type,
        event_id: event_id,
        data: data,
        metadata: ,
        timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load()],
        valid_at: valid_at || timestamp_
      )
      .deserialize(serializer)
  )
end

#event_in_stream?(event_id, stream_name) ⇒ Boolean

Checks whether event is linked in given stream

Parameters:

  • event_id (String)
  • stream_name (String)

Returns:

  • (Boolean)

    true if event is linked to given stream, false otherwise



127
128
129
130
# File 'lib/ruby_event_store/client.rb', line 127

def event_in_stream?(event_id, stream_name)
  stream = Stream.new(stream_name)
  stream.global? ? repository.has_event?(event_id) : repository.event_in_stream?(event_id, stream)
end

#global_position(event_id) ⇒ Integer

Gets position of the event in global stream

The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.

Parameters:

  • event_id (String)

Returns:

  • (Integer)

    nonnegno ative integer position of event in global stream

Raises:



118
119
120
# File 'lib/ruby_event_store/client.rb', line 118

def global_position(event_id)
  repository.global_position(event_id)
end

#inspectObject



332
333
334
# File 'lib/ruby_event_store/client.rb', line 332

def inspect
  "#<#{self.class}:0x#{__id__.to_s(16)}>"
end

Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.

Parameters:

  • event_ids (String, Array<String>)

    ids of events

  • stream_name (String)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


64
65
66
67
# File 'lib/ruby_event_store/client.rb', line 64

def link(event_ids, stream_name:, expected_version: :any)
  repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version))
  self
end

#metadataHash

Read additional metadata which will be added for published events Read more

Returns:

  • (Hash)


294
295
296
# File 'lib/ruby_event_store/client.rb', line 294

def 
  @metadata.value || EMPTY_HASH
end

#overwrite(events_or_event) ⇒ self

Overwrite existing event(s) with the same ID.

Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more

Examples:

Add data and metadata to existing events


events = event_store.read.limit(10).to_a
events.each do |ev|
  ev.data[:tenant_id] = 1
  ev.[:server_id] = "eu-west-2"
end
event_store.overwrite(events)

Change event type


events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
  NewType.new(
    event_id: ev.event_id,
    data: ev.data,
    metadata: ev.,
  )
end
event_store.overwrite(events)

Parameters:

  • events (Array<Event>, Event)

    event(s) to serialize and overwrite again

Returns:

  • (self)


327
328
329
330
# File 'lib/ruby_event_store/client.rb', line 327

def overwrite(events_or_event)
  repository.update_messages(transform(Array(events_or_event)))
  self
end

#position_in_stream(event_id, stream_name) ⇒ Integer

Gets position of the event in given stream

The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.

Parameters:

  • event_id (String)
  • stream_name (String)

Returns:

  • (Integer)

    nonnegative integer position of event in stream

Raises:

  • (EventNotInStream)


105
106
107
# File 'lib/ruby_event_store/client.rb', line 105

def position_in_stream(event_id, stream_name)
  repository.position_in_stream(event_id, Stream.new(stream_name))
end

#publish(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists events and notifies subscribed handlers about them

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


32
33
34
35
36
37
38
39
40
41
42
# File 'lib/ruby_event_store/client.rb', line 32

def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  enriched_events = (events)
  records = transform(enriched_events)
  append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
  enriched_events.zip(records) do |event, record|
    (correlation_id: event..fetch(:correlation_id), causation_id: event.event_id) do
      broker.(event, record)
    end
  end
  self
end

#readSpecification

Starts building a query specification for reading events. / More info.

Returns:



84
85
86
# File 'lib/ruby_event_store/client.rb', line 84

def read
  Specification.new(SpecificationReader.new(repository, mapper))
end

#streams_of(event_id) ⇒ Array<Stream>

Gets list of streams where event is stored or linked

Returns:

  • (Array<Stream>)

    where event is stored or linked



91
92
93
# File 'lib/ruby_event_store/client.rb', line 91

def streams_of(event_id)
  repository.streams_of(event_id)
end

#subscribe(subscriber, to: ) ⇒ Proc #subscribe(to: , &subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for published events of provided type.

Overloads:

  • #subscribe(subscriber, to: ) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe(to: , &subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


144
145
146
147
148
# File 'lib/ruby_event_store/client.rb', line 144

def subscribe(subscriber = nil, to:, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  subscriber ||= proc
  broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) })
end

#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for all published events

Overloads:

  • #subscribe_to_all_events(subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe_to_all_events(&subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


160
161
162
163
# File 'lib/ruby_event_store/client.rb', line 160

def subscribe_to_all_events(subscriber = nil, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  broker.add_global_subscription(subscriber || proc)
end

#subscribers_for(event_class) ⇒ Array<Object, Class>

Get list of handlers subscribed to an event

Parameters:

  • to (Class, String)

    type of events to get list of sybscribed handlers

Returns:

  • (Array<Object, Class>)


169
170
171
# File 'lib/ruby_event_store/client.rb', line 169

def subscribers_for(event_class)
  subscriptions.all_for(event_type_resolver.call(event_class))
end

#with_metadata(metadata_for_block, &block) ⇒ Object

Set additional metadata for all events published within the provided block Read more

Parameters:

  • metadata (Hash)

    metadata to set for events

  • block (Proc)

    block of code during which the metadata will be added

Returns:

  • (Object)

    last value returned by the provided block



261
262
263
264
265
266
267
# File 'lib/ruby_event_store/client.rb', line 261

def (, &block)
   = 
  self. = .merge()
  block.call if block_given?
ensure
  self. = 
end

#within(&block) ⇒ Within

Use for starting temporary subscriptions. Read more

Parameters:

  • block (Proc)

    block of code during which the temporary subscriptions will be active

Returns:

  • (Within)

    builder object which collects temporary subscriptions

Raises:

  • (ArgumentError)


250
251
252
253
# File 'lib/ruby_event_store/client.rb', line 250

def within(&block)
  raise ArgumentError if block.nil?
  Within.new(block, broker, event_type_resolver)
end