Class: Akasha::Storage::HttpEventStore::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/akasha/storage/http_event_store/stream.rb

Overview

HTTP Eventstore stream.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, stream_name, max_retries: 0) ⇒ Stream

Create a stream object for accessing a ES stream. Does not create the underlying stream itself. Use the ‘max_retries` option to choose how many times to retry in case of network failures.



12
13
14
15
16
# File 'lib/akasha/storage/http_event_store/stream.rb', line 12

def initialize(client, stream_name, max_retries: 0)
  @client = client
  @name = stream_name
  @max_retries = max_retries
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/akasha/storage/http_event_store/stream.rb', line 6

def name
  @name
end

Instance Method Details

#metadataObject

Reads stream metadata.



49
50
51
# File 'lib/akasha/storage/http_event_store/stream.rb', line 49

def 
  @client.(@name, max_retries: @max_retries)
end

#metadata=(metadata) ⇒ Object

Updates stream metadata.



54
55
56
# File 'lib/akasha/storage/http_event_store/stream.rb', line 54

def metadata=()
  @client.(@name, , max_retries: @max_retries)
end

#read_events(start, page_size, poll: 0) ⇒ Object

Reads events from the stream starting from ‘start` inclusive. If block given, reads all events from the position in pages of `page_size`. If block not given, reads `size` events from the position. You can also turn on long-polling using `poll` and setting it to the number of seconds to wait for.



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/akasha/storage/http_event_store/stream.rb', line 34

def read_events(start, page_size, poll: 0)
  if block_given?
    position = start
    loop do
      events = read_events(position, page_size, poll: poll)
      return if events.empty?
      yield(events)
      position += events.size
    end
  else
    @client.retry_read_events_forward(@name, start, page_size, poll, max_retries: @max_retries)
  end
end

#write_events(events, revision: nil) ⇒ Object

Appends ‘events` to the stream. You can specify `revision` to use optimistic concurrency control:

- nil  - just append, no concurrency control,
- -1   - the stream doesn't exist,
- >= 0 - expected revision of the last event in stream.


23
24
25
26
27
# File 'lib/akasha/storage/http_event_store/stream.rb', line 23

def write_events(events, revision: nil)
  return if events.empty?
  expected_version = revision.nil? ? -2 : revision
  @client.retry_append_to_stream(@name, events, expected_version, max_retries: @max_retries)
end