Class: SSE::Client
- Inherits:
-
Object
- Object
- SSE::Client
- Defined in:
- lib/ld-eventsource/client.rb
Overview
A lightweight SSE client implementation. The client uses a worker thread to read from the streaming HTTP connection. Events are dispatched from the same worker thread.
The client will attempt to recover from connection failures as follows:
-
The first time the connection is dropped, it will wait about one second (or whatever value is
specified for ‘reconnect_time`) before attempting to reconnect. The actual delay has a pseudo-random jitter value added.
-
If the connection fails again within the time range specified by ‘reconnect_reset_interval`,
it will exponentially increase the delay between attempts (and also apply a random jitter). However, if the connection stays up for at least that amount of time, the delay will be reset to the minimum.
-
Each time a new connection is made, the client will send a ‘Last-Event-Id` header so the server
can pick up where it left off (if the server has been sending ID values for events).
It is also possible to force the connection to be restarted if the server sends no data within an interval specified by ‘read_timeout`. Using a read timeout is advisable because otherwise it is possible in some circumstances for a connection failure to go undetected. To keep the connection from timing out if there are no events to send, the server could send a comment line (`“:”`) at regular intervals as a heartbeat.
Constant Summary collapse
- DEFAULT_CONNECT_TIMEOUT =
The default value for ‘connect_timeout` in #initialize.
10
- DEFAULT_READ_TIMEOUT =
The default value for ‘read_timeout` in #initialize.
300
- DEFAULT_RECONNECT_TIME =
The default value for ‘reconnect_time` in #initialize.
1
- MAX_RECONNECT_TIME =
The maximum number of seconds that the client will wait before reconnecting.
30
- DEFAULT_RECONNECT_RESET_INTERVAL =
The default value for ‘reconnect_reset_interval` in #initialize.
60
Instance Method Summary collapse
-
#close ⇒ Object
Permanently shuts down the client and its connection.
-
#initialize(uri, headers: {}, connect_timeout: DEFAULT_CONNECT_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, last_event_id: nil, proxy: nil, logger: nil) {|client| ... } ⇒ Client
constructor
Creates a new SSE client.
-
#on_error {|error| ... } ⇒ Object
Specifies a block or Proc to receive connection errors.
-
#on_event {|event| ... } ⇒ Object
Specifies a block or Proc to receive events from the stream.
Constructor Details
#initialize(uri, headers: {}, connect_timeout: DEFAULT_CONNECT_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, last_event_id: nil, proxy: nil, logger: nil) {|client| ... } ⇒ Client
Creates a new SSE client.
Once the client is created, it immediately attempts to open the SSE connection. You will normally want to register your event handler before this happens, so that no events are missed. To do this, provide a block after the constructor; the block will be executed before opening the connection.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/ld-eventsource/client.rb', line 85 def initialize(uri, headers: {}, connect_timeout: DEFAULT_CONNECT_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, last_event_id: nil, proxy: nil, logger: nil) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout @logger = logger || default_logger if proxy @proxy = proxy else proxy_uri = @uri.find_proxy if !proxy_uri.nil? && (proxy_uri.scheme == 'http' || proxy_uri.scheme == 'https') @proxy = proxy_uri end end @backoff = Impl::Backoff.new(reconnect_time || DEFAULT_RECONNECT_TIME, MAX_RECONNECT_TIME, reconnect_reset_interval: reconnect_reset_interval) @on = { event: ->(_) {}, error: ->(_) {} } @last_id = last_event_id yield self if block_given? Thread.new do run_stream end end |
Instance Method Details
#close ⇒ Object
Permanently shuts down the client and its connection. No further events will be dispatched. This has no effect if called a second time.
164 165 166 167 168 169 |
# File 'lib/ld-eventsource/client.rb', line 164 def close if @stopped.make_true @cxn.close if !@cxn.nil? @cxn = nil end end |
#on_error {|error| ... } ⇒ Object
Specifies a block or Proc to receive connection errors. This will be called with a single parameter that is an instance of some exception class– normally, either some I/O exception or one of the classes in Errors. It is called from the same worker thread that reads the stream, so no more events or errors will be dispatched until it returns.
If the error handler decides that this type of error is not recoverable, it has the ability to prevent any further reconnect attempts by calling #close on the Client. For instance, you might want to do this if the server returned a ‘401 Unauthorized` error and no other authorization credentials are available, since any further requests would presumably also receive a 401.
Any previously specified error handler will be replaced.
156 157 158 |
# File 'lib/ld-eventsource/client.rb', line 156 def on_error(&action) @on[:error] = action end |
#on_event {|event| ... } ⇒ Object
Specifies a block or Proc to receive events from the stream. This will be called once for every valid event received, with a single parameter of type StreamEvent. It is called from the same worker thread that reads the stream, so no more events will be dispatched until it returns.
Any exception that propagates out of the handler will cause the stream to disconnect and reconnect, on the assumption that data may have been lost and that restarting the stream will cause it to be resent.
Any previously specified event handler will be replaced.
137 138 139 |
# File 'lib/ld-eventsource/client.rb', line 137 def on_event(&action) @on[:event] = action end |