Class: Skyfall::Stream
- Inherits:
-
Object
- Object
- Skyfall::Stream
- Extended by:
- Events
- Defined in:
- lib/skyfall/stream.rb
Overview
Base class of a websocket client. It provides basic websocket client functionality such as connecting to the service, keeping the connection alive and running lifecycle callbacks.
In most cases, you will not create instances of this class directly, but rather use either Firehose or Jetstream. Use this class as a superclass if you need to implement some custom client for a websocket API that isn’t supported yet.
Constant Summary collapse
- MAX_RECONNECT_INTERVAL =
300
Instance Attribute Summary collapse
-
#auto_reconnect ⇒ Boolean
If enabled, the client will try to reconnect if the connection is closed unexpectedly.
-
#check_heartbeat ⇒ Boolean
If enabled, runs a timer which does periodical “heatbeat checks”.
-
#heartbeat_interval ⇒ Numeric
Interval in seconds between heartbeat checks (default: 10).
-
#heartbeat_timeout ⇒ Numeric
Number of seconds without messages after which reconnect is triggered (default: 300).
-
#last_update ⇒ Time?
readonly
Time when the most recent message was received from the websocket.
-
#user_agent ⇒ String
User agent sent in the header when connecting.
Instance Method Summary collapse
-
#connect ⇒ nil
Opens a connection to the configured websocket.
-
#default_user_agent ⇒ String
Default user agent sent when connecting to the service.
-
#disconnect ⇒ nil
(also: #close)
Closes the connection and stops the EventMachine reactor thread.
-
#initialize(server) ⇒ Stream
constructor
A new instance of Stream.
-
#inspect ⇒ String
Returns a string with a representation of the object for debugging purposes.
-
#on_connect(block) ⇒ nil
Defines a callback to be run after a connection to the websocket is opened.
-
#on_connecting(block) {|url| ... } ⇒ nil
Defines a callback to be run when the client tries to open a connection to the websocket.
-
#on_disconnect(block) ⇒ nil
Defines a callback to be run after a connection to the websocket is closed (and the client does not reconnect).
-
#on_error(block) {|error| ... } ⇒ nil
Defines a callback to be run when the websocket connection returns an error.
-
#on_message(block) {|message| ... } ⇒ nil
Defines a callback to be run when a message is received, passing the message as a parsed object of an appropriate message class.
-
#on_raw_message(block) {|data| ... } ⇒ nil
Defines a callback to be run when a message is received, passing a raw data packet as received from the websocket (plain text or binary).
-
#on_reconnect(block) ⇒ nil
Defines a callback to be run when a connection to the websocket is broken, but the client initiates or schedules a reconnect (which may happen after a delay).
-
#on_timeout(block) ⇒ nil
Defines a callback to be run when the heartbeat timer forces a reconnect.
-
#reconnect ⇒ nil
Forces a reconnect, closing the connection and calling #connect again.
-
#version_string ⇒ String
Skyfall version string for use in user agent strings (‘“Skyfall/x.y”`).
Constructor Details
#initialize(server) ⇒ Stream
Returns a new instance of Stream.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/skyfall/stream.rb', line 83 def initialize(server) @root_url = build_root_url(server) @handlers = {} @auto_reconnect = true @check_heartbeat = false @connection_attempts = 0 @heartbeat_interval = 10 @heartbeat_timeout = 300 @last_update = nil @user_agent = default_user_agent @handlers[:error] = proc { |e| puts "ERROR: #{e}" } end |
Instance Attribute Details
#auto_reconnect ⇒ Boolean
If enabled, the client will try to reconnect if the connection is closed unexpectedly. (Default: true)
When the reconnect attempt fails, it will wait with an exponential backoff delay before retrying again, up to MAX_RECONNECT_INTERVAL seconds.
32 33 34 |
# File 'lib/skyfall/stream.rb', line 32 def auto_reconnect @auto_reconnect end |
#check_heartbeat ⇒ Boolean
If enabled, runs a timer which does periodical “heatbeat checks”.
The heartbeat timer is started when the client connects to the service, and checks if the stream hasn’t stalled and is still regularly sending new messages. If no messages are detected for some period of time, the client forces a reconnect.
This is not enabled by default, because depending on the service you’re connecting to, it might be normal to not receive any messages for a while.
57 58 59 |
# File 'lib/skyfall/stream.rb', line 57 def check_heartbeat @check_heartbeat end |
#heartbeat_interval ⇒ Numeric
Interval in seconds between heartbeat checks (default: 10). Only used if #check_heartbeat is set.
61 62 63 |
# File 'lib/skyfall/stream.rb', line 61 def heartbeat_interval @heartbeat_interval end |
#heartbeat_timeout ⇒ Numeric
Number of seconds without messages after which reconnect is triggered (default: 300). Only used if #check_heartbeat is set.
66 67 68 |
# File 'lib/skyfall/stream.rb', line 66 def heartbeat_timeout @heartbeat_timeout end |
#last_update ⇒ Time? (readonly)
Time when the most recent message was received from the websocket.
Note: this is _local time_ when the message was received; this is different from the timestamp of the message, which is the server time of the original source (PDS) when emitting the message, and different from a potential created_at saved in the record.
75 76 77 |
# File 'lib/skyfall/stream.rb', line 75 def last_update @last_update end |
#user_agent ⇒ String
User agent sent in the header when connecting.
Default value is #default_user_agent = #version_string ‘(Skyfall/x.y)`. It’s recommended to set it or extend it with some information that indicates what service this is and who is running it (e.g. a Bluesky handle).
43 44 45 |
# File 'lib/skyfall/stream.rb', line 43 def user_agent @user_agent end |
Instance Method Details
#connect ⇒ nil
Opens a connection to the configured websocket.
This method starts an EventMachine reactor on the current thread, and will only return once the connection is closed.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/skyfall/stream.rb', line 108 def connect return if @ws if @handlers[:message].nil? && @handlers[:raw_message].nil? raise ConfigError, "Either on_message or on_raw_message handler needs to be set" end url = build_websocket_url @handlers[:connecting]&.call(url) @reconnect_timer&.cancel @reconnect_timer = nil raise ReactorActiveError if existing_reactor? @engines_on = true EM.run do EventMachine.error_handler do |e| @handlers[:error]&.call(e) end @ws = build_websocket_client(url) @ws.on(:open) do |e| @handlers[:connect]&.call @last_update = Time.now start_heartbeat_timer end @ws.on(:message) do |msg| @reconnecting = false @connection_attempts = 0 @last_update = Time.now (msg) end @ws.on(:error) do |e| @handlers[:error]&.call(e) end @ws.on(:close) do |e| @ws = nil if @reconnecting || @auto_reconnect && @engines_on @handlers[:reconnect]&.call @reconnect_timer&.cancel @reconnect_timer = EM::Timer.new(reconnect_delay) do @connection_attempts += 1 connect end else stop_heartbeat_timer @engines_on = false @handlers[:disconnect]&.call EM.stop_event_loop unless @ws end end end end |
#default_user_agent ⇒ String
Default user agent sent when connecting to the service. (Currently ‘“##version_string”`)
200 201 202 |
# File 'lib/skyfall/stream.rb', line 200 def default_user_agent version_string end |
#disconnect ⇒ nil Also known as: close
Closes the connection and stops the EventMachine reactor thread.
186 187 188 189 190 191 192 |
# File 'lib/skyfall/stream.rb', line 186 def disconnect return unless EM.reactor_running? @reconnecting = false @engines_on = false EM.stop_event_loop end |
#inspect ⇒ String
Returns a string with a representation of the object for debugging purposes.
303 304 305 306 |
# File 'lib/skyfall/stream.rb', line 303 def inspect vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") "#<#{self.class}:0x#{object_id} #{vars}>" end |
#on_connect(block) ⇒ nil
Defines a callback to be run after a connection to the websocket is opened. Can be also run as a setter on_connect=.
238 |
# File 'lib/skyfall/stream.rb', line 238 event_handler :connect |
#on_connecting(block) {|url| ... } ⇒ nil
Defines a callback to be run when the client tries to open a connection to the websocket. Can be also run as a setter on_connecting=.
230 |
# File 'lib/skyfall/stream.rb', line 230 event_handler :connecting |
#on_disconnect(block) ⇒ nil
Defines a callback to be run after a connection to the websocket is closed (and the client does not reconnect). Can be also run as a setter on_disconnect=.
This callback is not run when on_reconnect fires.
266 |
# File 'lib/skyfall/stream.rb', line 266 event_handler :disconnect |
#on_error(block) {|error| ... } ⇒ nil
Defines a callback to be run when the websocket connection returns an error. Can be also run as a setter on_error=.
Default handler prints the error to stdout.
298 |
# File 'lib/skyfall/stream.rb', line 298 event_handler :error |
#on_message(block) {|message| ... } ⇒ nil
Defines a callback to be run when a message is received, passing the message as a parsed object of an appropriate message class. Can be also run as a setter on_message=.
256 |
# File 'lib/skyfall/stream.rb', line 256 event_handler :message |
#on_raw_message(block) {|data| ... } ⇒ nil
Defines a callback to be run when a message is received, passing a raw data packet as received from the websocket (plain text or binary). Can be also run as a setter on_raw_message=.
247 |
# File 'lib/skyfall/stream.rb', line 247 event_handler :raw_message |
#on_reconnect(block) ⇒ nil
Defines a callback to be run when a connection to the websocket is broken, but the client initiates or schedules a reconnect (which may happen after a delay). Can be also run as a setter on_reconnect=.
275 |
# File 'lib/skyfall/stream.rb', line 275 event_handler :reconnect |
#on_timeout(block) ⇒ nil
Defines a callback to be run when the heartbeat timer forces a reconnect. A reconnect is triggered after not receiving any messages for a period of time specified in #heartbeat_timeout (if #check_heartbeat is enabled). Can be also run as a setter on_timeout=.
This callback is also followed by on_reconnect.
286 |
# File 'lib/skyfall/stream.rb', line 286 event_handler :timeout |
#reconnect ⇒ nil
Forces a reconnect, closing the connection and calling #connect again.
175 176 177 178 179 180 |
# File 'lib/skyfall/stream.rb', line 175 def reconnect @reconnecting = true @connection_attempts = 0 @ws ? @ws.close : connect end |