Class: Akasha::Storage::HttpEventStore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/akasha/storage/http_event_store/client.rb

Overview

Eventstore HTTP client.

Constant Summary collapse

MIN_RETRY_INTERVAL =

A lower limit for a retry interval.

0
MAX_RETRY_INTERVAL =

An upper limit for a retry interval.

10.0

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 2113, username: nil, password: nil) ⇒ Client

Creates a new client for the host and port with optional username and password for authenticating certain requests.



29
30
31
32
33
34
# File 'lib/akasha/storage/http_event_store/client.rb', line 29

def initialize(host: 'localhost', port: 2113, username: nil, password: nil)
  @username = username
  @password = password
  @conn = connection(host, port)
  @serializer = EventSerializer.new
end

Instance Method Details

#merge_all_by_event(name, event_names, namespace: nil, max_retries: 0) ⇒ Object

Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.

Arguments:

`name` - name of the projection stream
`event_names` - array of event names
`namespace` - optional namespace; if provided, the resulting stream will
              only contain events with the same metadata.namespace
`max_retries` - how many times to retry in case of network failures


62
63
64
65
66
# File 'lib/akasha/storage/http_event_store/client.rb', line 62

def merge_all_by_event(name, event_names, namespace: nil, max_retries: 0)
  retrying_on_network_failures(max_retries) do
    ProjectionManager.new(self).merge_all_by_event(name, event_names, namespace: namespace)
  end
end

#request(method, path, body = nil, headers = {}) ⇒ Object

Issues a generic request against the API.



82
83
84
85
86
# File 'lib/akasha/storage/http_event_store/client.rb', line 82

def request(method, path, body = nil, headers = {})
  body = @conn.public_send(method, path, body, auth_headers.merge(headers)).body
  return {} if body.empty?
  body
end

#retry_append_to_stream(stream_name, events, expected_revision = nil, max_retries: 0) ⇒ Object

Append events to stream, idempotently retrying_on_network_failures up to ‘max_retries`



37
38
39
40
41
# File 'lib/akasha/storage/http_event_store/client.rb', line 37

def retry_append_to_stream(stream_name, events, expected_revision = nil, max_retries: 0)
  retrying_on_network_failures(max_retries) do
    append_to_stream(stream_name, events, expected_revision)
  end
end

#retry_read_events_forward(stream_name, start, count, poll = 0, max_retries: 0) ⇒ Object

Read events from stream, retrying_on_network_failures up to ‘max_retries` in case of network failures. Reads `count` events starting from `start` inclusive. Can long-poll for events if `poll` is specified.`



46
47
48
49
50
# File 'lib/akasha/storage/http_event_store/client.rb', line 46

def retry_read_events_forward(stream_name, start, count, poll = 0, max_retries: 0)
  retrying_on_network_failures(max_retries) do
    safe_read_events(stream_name, start, count, poll)
  end
end

#retry_read_metadata(stream_name, max_retries: 0) ⇒ Object

Reads stream metadata.



69
70
71
72
73
# File 'lib/akasha/storage/http_event_store/client.rb', line 69

def (stream_name, max_retries: 0)
  retrying_on_network_failures(max_retries) do
    (stream_name)
  end
end

#retry_write_metadata(stream_name, metadata, max_retries: 0) ⇒ Object

Updates stream metadata.



76
77
78
79
# File 'lib/akasha/storage/http_event_store/client.rb', line 76

def (stream_name, , max_retries: 0)
  event = Akasha::Event.new(:stream_metadata_changed, SecureRandom.uuid, )
  retry_append_to_stream("#{stream_name}/metadata", [event], max_retries: max_retries)
end