Class: SecApi::Stream
- Inherits:
-
Object
- Object
- SecApi::Stream
- Defined in:
- lib/sec_api/stream.rb
Overview
The subscribe method blocks while receiving events. For non-blocking operation, run in a separate thread.
**Security consideration:** The API key is passed as a URL query parameter (per sec-api.io Stream API specification). Unlike the REST API which uses the Authorization header, WebSocket URLs may be logged by proxies, load balancers, or web server access logs. Ensure your infrastructure does not log full WebSocket URLs in production.
Ping/Pong: The sec-api.io server sends ping frames every 25 seconds and expects a pong response within 5 seconds. This is handled automatically by faye-websocket - no application code is required.
**Sequential Processing:** Callbacks are invoked synchronously in the order filings are received. Each callback must complete before the next filing is processed. This guarantees ordering but means slow callbacks delay subsequent filings. For high-throughput scenarios, delegate work to background jobs.
Auto-Reconnect: When the WebSocket connection is lost (network issues, server restart), the stream automatically attempts to reconnect using exponential backoff. After 10 failed attempts (by default), a ReconnectionError is raised. Configure via Config#stream_max_reconnect_attempts, Config#stream_initial_reconnect_delay, Config#stream_max_reconnect_delay, and Config#stream_backoff_multiplier.
**Best-Effort Delivery:** Filings published during a disconnection window are not automatically replayed after reconnection. This is a “best-effort” delivery model. If you require guaranteed delivery, track the last received filing timestamp and use the Query API to backfill any gaps after reconnection. See the backfill example below.
**No Ordering Guarantees During Reconnection:** While connected, filings arrive in order. However, during a reconnection gap, filings may be published to EDGAR that your application never sees. After backfilling, the combined set may not be in strict chronological order. Sort by filed_at if ordering is critical.
WebSocket streaming proxy for real-time SEC filing notifications.
Connection Management Design (Architecture ADR-7):
-
Auto-reconnect: Network issues shouldn’t require user intervention. Exponential backoff with jitter prevents thundering herd on server recovery.
-
Callback isolation: User callback exceptions mustn’t crash the stream. We catch, log, and continue - the stream is more valuable than any single callback.
-
Best-effort delivery: Filings during disconnection are lost. Users must backfill via Query API if guaranteed delivery is required. This tradeoff keeps the stream simple and avoids complex replay/dedup logic.
Connects to sec-api.io’s Stream API via WebSocket and delivers filing notifications as they’re published to the SEC EDGAR system.
Constant Summary collapse
- CLOSE_NORMAL =
WebSocket close code for normal closure (client or server initiated clean close).
1000- CLOSE_GOING_AWAY =
WebSocket close code when endpoint is going away (server shutdown/restart).
1001- CLOSE_ABNORMAL =
WebSocket close code for abnormal closure (connection lost unexpectedly). Triggers automatic reconnection when Config#stream_max_reconnect_attempts > 0.
1006- CLOSE_POLICY_VIOLATION =
WebSocket close code for policy violation (authentication failure). Raised as AuthenticationError - does not trigger reconnection.
1008
Instance Attribute Summary collapse
-
#client ⇒ SecApi::Client
readonly
The parent client instance.
Instance Method Summary collapse
-
#close ⇒ void
Close the streaming connection.
-
#connected? ⇒ Boolean
Check if the stream is currently connected.
-
#filters ⇒ Hash
Returns the current filter configuration.
-
#initialize(client) ⇒ Stream
constructor
Creates a new Stream proxy.
-
#subscribe(tickers: nil, form_types: nil) {|SecApi::Objects::StreamFiling| ... } ⇒ void
Subscribe to real-time filing notifications with optional filtering.
Constructor Details
#initialize(client) ⇒ Stream
Creates a new Stream proxy.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/sec_api/stream.rb', line 128 def initialize(client) @client = client @ws = nil @running = false @callback = nil @tickers = nil @form_types = nil @mutex = Mutex.new # Reconnection state (Story 6.4) @reconnect_attempts = 0 @should_reconnect = true @reconnecting = false @disconnect_time = nil end |
Instance Attribute Details
#client ⇒ SecApi::Client (readonly)
Returns The parent client instance.
122 123 124 |
# File 'lib/sec_api/stream.rb', line 122 def client @client end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Close the streaming connection.
Gracefully closes the WebSocket connection and stops the EventMachine reactor. After closing, no further callbacks will be invoked.
233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/sec_api/stream.rb', line 233 def close @mutex.synchronize do # Prevent reconnection attempts after explicit close @should_reconnect = false return unless @ws @ws.close(CLOSE_NORMAL, "Client requested close") @ws = nil @running = false end end |
#connected? ⇒ Boolean
Check if the stream is currently connected.
250 251 252 253 254 |
# File 'lib/sec_api/stream.rb', line 250 def connected? @mutex.synchronize do @running && @ws && @ws.ready_state == Faye::WebSocket::API::OPEN end end |
#filters ⇒ Hash
Returns the current filter configuration.
Useful for debugging and monitoring to inspect which filters are active.
266 267 268 269 270 271 |
# File 'lib/sec_api/stream.rb', line 266 def filters { tickers: @tickers, form_types: @form_types } end |
#subscribe(tickers: nil, form_types: nil) {|SecApi::Objects::StreamFiling| ... } ⇒ void
Callbacks execute synchronously in the EventMachine reactor thread. Long-running operations should be delegated to background jobs or thread pools to avoid blocking subsequent filing deliveries. Keep callbacks fast.
Callback exceptions are caught and logged (if logger configured). Use Config#on_callback_error for custom error handling. The stream continues processing after callback exceptions.
This method returns an undefined value.
Subscribe to real-time filing notifications with optional filtering.
Establishes a WebSocket connection to sec-api.io’s Stream API and invokes the provided block for each filing received. This method blocks while the connection is open.
Filtering is performed client-side (sec-api.io streams all filings). When both tickers and form_types are specified, AND logic is applied.
213 214 215 216 217 218 219 220 |
# File 'lib/sec_api/stream.rb', line 213 def subscribe(tickers: nil, form_types: nil, &block) raise ArgumentError, "Block required for subscribe" unless block_given? @tickers = normalize_filter(tickers) @form_types = normalize_filter(form_types) @callback = block connect end |