Class: Akasha::Checkpoint::HttpEventStoreCheckpoint

Inherits:
Object
  • Object
show all
Defined in:
lib/akasha/checkpoint/http_event_store_checkpoint.rb

Overview

Stores stream position via HTTP Eventstore API.

Constant Summary collapse

Error =
Class.new(RuntimeError)
StreamNotFoundError =
Class.new(Error)

Instance Method Summary collapse

Constructor Details

#initialize(stream, interval: 1) ⇒ HttpEventStoreCheckpoint

Creates a new checkpoint, storing position in `stream` every `interval` events. Use `interval` greater than zero for idempotent event listeners.

Raises:

  • (UnsupportedStorageError)

10
11
12
13
14
15
# File 'lib/akasha/checkpoint/http_event_store_checkpoint.rb', line 10

def initialize(stream, interval: 1)
  @stream = stream
  @interval = interval
  return if @stream.respond_to?(:metadata) && @stream.respond_to?(:metadata=)
  raise UnsupportedStorageError, "Storage does not support checkpoints: #{stream.class}"
end

Instance Method Details

#ack(position) ⇒ Object

Returns the next position, conditionally storing it (based on the configurable interval).


23
24
25
26
27
28
29
30
31
32
33
# File 'lib/akasha/checkpoint/http_event_store_checkpoint.rb', line 23

def ack(position)
  @next_position = position + 1
  if (@next_position % @interval).zero?
    # TODO: Race condition; use optimistic cocurrency.
    @stream. = @stream..merge(next_position: @next_position)
  end
  @next_position
rescue Akasha::Storage::HttpEventStore::HttpClientError => e
  raise if e.status_code != 404
  raise StreamNotFoundError, "Stream cannot be checkpointed; it does not exist: #{@stream.name}"
end

#latestObject

Returns the most recently stored next position.


18
19
20
# File 'lib/akasha/checkpoint/http_event_store_checkpoint.rb', line 18

def latest
  @next_position ||= (read_position || 0)
end