Class: Akasha::Storage::HttpEventStore::Stream
- Inherits:
-
Object
- Object
- Akasha::Storage::HttpEventStore::Stream
- Defined in:
- lib/akasha/storage/http_event_store/stream.rb
Overview
HTTP Eventstore stream.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#initialize(client, stream_name, max_retries: 0) ⇒ Stream
constructor
Create a stream object for accessing a ES stream.
-
#metadata ⇒ Object
Reads stream metadata.
-
#metadata=(metadata) ⇒ Object
Updates stream metadata.
-
#read_events(start, page_size, poll: 0) ⇒ Object
Reads events from the stream starting from ‘start` inclusive.
-
#write_events(events, revision: nil) ⇒ Object
Appends ‘events` to the stream.
Constructor Details
#initialize(client, stream_name, max_retries: 0) ⇒ Stream
Create a stream object for accessing a ES stream. Does not create the underlying stream itself. Use the ‘max_retries` option to choose how many times to retry in case of network failures.
12 13 14 15 16 |
# File 'lib/akasha/storage/http_event_store/stream.rb', line 12 def initialize(client, stream_name, max_retries: 0) @client = client @name = stream_name @max_retries = max_retries end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
6 7 8 |
# File 'lib/akasha/storage/http_event_store/stream.rb', line 6 def name @name end |
Instance Method Details
#metadata ⇒ Object
Reads stream metadata.
49 50 51 |
# File 'lib/akasha/storage/http_event_store/stream.rb', line 49 def @client.(@name, max_retries: @max_retries) end |
#metadata=(metadata) ⇒ Object
Updates stream metadata.
54 55 56 |
# File 'lib/akasha/storage/http_event_store/stream.rb', line 54 def () @client.(@name, , max_retries: @max_retries) end |
#read_events(start, page_size, poll: 0) ⇒ Object
Reads events from the stream starting from ‘start` inclusive. If block given, reads all events from the position in pages of `page_size`. If block not given, reads `size` events from the position. You can also turn on long-polling using `poll` and setting it to the number of seconds to wait for.
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/akasha/storage/http_event_store/stream.rb', line 34 def read_events(start, page_size, poll: 0) if block_given? position = start loop do events = read_events(position, page_size, poll: poll) return if events.empty? yield(events) position += events.size end else @client.retry_read_events_forward(@name, start, page_size, poll, max_retries: @max_retries) end end |
#write_events(events, revision: nil) ⇒ Object
Appends ‘events` to the stream. You can specify `revision` to use optimistic concurrency control:
- nil - just append, no concurrency control,
- -1 - the stream doesn't exist,
- >= 0 - expected revision of the last event in stream.
23 24 25 26 27 |
# File 'lib/akasha/storage/http_event_store/stream.rb', line 23 def write_events(events, revision: nil) return if events.empty? expected_version = revision.nil? ? -2 : revision @client.retry_append_to_stream(@name, events, expected_version, max_retries: @max_retries) end |