Class: SecApi::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/sec_api/stream.rb

Overview

Note:

The subscribe method blocks while receiving events. For non-blocking operation, run in a separate thread.

Note:

**Security consideration:** The API key is passed as a URL query parameter (per sec-api.io Stream API specification). Unlike the REST API which uses the Authorization header, WebSocket URLs may be logged by proxies, load balancers, or web server access logs. Ensure your infrastructure does not log full WebSocket URLs in production.

Note:

Ping/Pong: The sec-api.io server sends ping frames every 25 seconds and expects a pong response within 5 seconds. This is handled automatically by faye-websocket - no application code is required.

Note:

**Sequential Processing:** Callbacks are invoked synchronously in the order filings are received. Each callback must complete before the next filing is processed. This guarantees ordering but means slow callbacks delay subsequent filings. For high-throughput scenarios, delegate work to background jobs.

Note:

Auto-Reconnect: When the WebSocket connection is lost (network issues, server restart), the stream automatically attempts to reconnect using exponential backoff. After 10 failed attempts (by default), a ReconnectionError is raised. Configure via Config#stream_max_reconnect_attempts, Config#stream_initial_reconnect_delay, Config#stream_max_reconnect_delay, and Config#stream_backoff_multiplier.

Note:

**Best-Effort Delivery:** Filings published during a disconnection window are not automatically replayed after reconnection. This is a “best-effort” delivery model. If you require guaranteed delivery, track the last received filing timestamp and use the Query API to backfill any gaps after reconnection. See the backfill example below.

Note:

**No Ordering Guarantees During Reconnection:** While connected, filings arrive in order. However, during a reconnection gap, filings may be published to EDGAR that your application never sees. After backfilling, the combined set may not be in strict chronological order. Sort by filed_at if ordering is critical.

WebSocket streaming proxy for real-time SEC filing notifications.

Connection Management Design (Architecture ADR-7):

  • Auto-reconnect: Network issues shouldn’t require user intervention. Exponential backoff with jitter prevents thundering herd on server recovery.

  • Callback isolation: User callback exceptions mustn’t crash the stream. We catch, log, and continue - the stream is more valuable than any single callback.

  • Best-effort delivery: Filings during disconnection are lost. Users must backfill via Query API if guaranteed delivery is required. This tradeoff keeps the stream simple and avoids complex replay/dedup logic.

Connects to sec-api.io’s Stream API via WebSocket and delivers filing notifications as they’re published to the SEC EDGAR system.

Examples:

Subscribe to real-time filings

client = SecApi::Client.new
client.stream.subscribe do |filing|
  puts "New filing: #{filing.ticker} - #{filing.form_type}"
end

Close the streaming connection

stream = client.stream
stream.subscribe { |f| process(f) }
# Later...
stream.close

Tracking last received filing for backfill detection

last_filed_at = nil

client.stream.subscribe do |filing|
  last_filed_at = filing.filed_at
  process_filing(filing)
end

Backfilling missed filings after reconnection

disconnect_time = nil
reconnect_time = nil

config = SecApi::Config.new(
  api_key: "...",
  on_reconnect: ->(info) {
    reconnect_time = Time.now
    # info[:downtime_seconds] tells you how long you were disconnected
    Rails.logger.info("Reconnected after #{info[:downtime_seconds]}s downtime")
  }
)

client = SecApi::Client.new(config: config)

# After reconnection, backfill via Query API:
# missed_filings = client.query.filings(
#   filed_from: disconnect_time,
#   filed_to: reconnect_time,
#   tickers: ["AAPL"]  # same filters as stream
# )

Constant Summary collapse

CLOSE_NORMAL =

WebSocket close code for normal closure (client or server initiated clean close).

Returns:

  • (Integer)

    WebSocket close code 1000

1000
CLOSE_GOING_AWAY =

WebSocket close code when endpoint is going away (server shutdown/restart).

Returns:

  • (Integer)

    WebSocket close code 1001

1001
CLOSE_ABNORMAL =

WebSocket close code for abnormal closure (connection lost unexpectedly). Triggers automatic reconnection when Config#stream_max_reconnect_attempts > 0.

Returns:

  • (Integer)

    WebSocket close code 1006

1006
CLOSE_POLICY_VIOLATION =

WebSocket close code for policy violation (authentication failure). Raised as AuthenticationError - does not trigger reconnection.

Returns:

  • (Integer)

    WebSocket close code 1008

1008

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Stream

Creates a new Stream proxy.

Parameters:



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/sec_api/stream.rb', line 128

def initialize(client)
  @client = client
  @ws = nil
  @running = false
  @callback = nil
  @tickers = nil
  @form_types = nil
  @mutex = Mutex.new
  # Reconnection state (Story 6.4)
  @reconnect_attempts = 0
  @should_reconnect = true
  @reconnecting = false
  @disconnect_time = nil
end

Instance Attribute Details

#clientSecApi::Client (readonly)

Returns The parent client instance.

Returns:



122
123
124
# File 'lib/sec_api/stream.rb', line 122

def client
  @client
end

Instance Method Details

#closevoid

This method returns an undefined value.

Close the streaming connection.

Gracefully closes the WebSocket connection and stops the EventMachine reactor. After closing, no further callbacks will be invoked.

Examples:

stream.close
stream.connected? # => false


233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/sec_api/stream.rb', line 233

def close
  @mutex.synchronize do
    # Prevent reconnection attempts after explicit close
    @should_reconnect = false

    return unless @ws

    @ws.close(CLOSE_NORMAL, "Client requested close")
    @ws = nil
    @running = false
  end
end

#connected?Boolean

Check if the stream is currently connected.

Returns:

  • (Boolean)

    true if WebSocket connection is open



250
251
252
253
254
# File 'lib/sec_api/stream.rb', line 250

def connected?
  @mutex.synchronize do
    @running && @ws && @ws.ready_state == Faye::WebSocket::API::OPEN
  end
end

#filtersHash

Returns the current filter configuration.

Useful for debugging and monitoring to inspect which filters are active.

Examples:

stream.subscribe(tickers: ["AAPL"]) { |f| }
stream.filters # => { tickers: ["AAPL"], form_types: nil }

Returns:

  • (Hash)

    Hash with :tickers and :form_types keys



266
267
268
269
270
271
# File 'lib/sec_api/stream.rb', line 266

def filters
  {
    tickers: @tickers,
    form_types: @form_types
  }
end

#subscribe(tickers: nil, form_types: nil) {|SecApi::Objects::StreamFiling| ... } ⇒ void

Note:

Callbacks execute synchronously in the EventMachine reactor thread. Long-running operations should be delegated to background jobs or thread pools to avoid blocking subsequent filing deliveries. Keep callbacks fast.

Note:

Callback exceptions are caught and logged (if logger configured). Use Config#on_callback_error for custom error handling. The stream continues processing after callback exceptions.

This method returns an undefined value.

Subscribe to real-time filing notifications with optional filtering.

Establishes a WebSocket connection to sec-api.io’s Stream API and invokes the provided block for each filing received. This method blocks while the connection is open.

Filtering is performed client-side (sec-api.io streams all filings). When both tickers and form_types are specified, AND logic is applied.

Examples:

Basic subscription (all filings)

client.stream.subscribe do |filing|
  puts "#{filing.ticker}: #{filing.form_type} filed at #{filing.filed_at}"
end

Filter by tickers

client.stream.subscribe(tickers: ["AAPL", "TSLA"]) do |filing|
  puts "#{filing.ticker}: #{filing.form_type}"
end

Filter by form types

client.stream.subscribe(form_types: ["10-K", "8-K"]) do |filing|
  process_material_event(filing)
end

Combined filters (AND logic)

client.stream.subscribe(tickers: ["AAPL"], form_types: ["10-K", "10-Q"]) do |filing|
  analyze_apple_financials(filing)
end

Non-blocking subscription in separate thread

Thread.new { client.stream.subscribe { |f| queue.push(f) } }

Sidekiq job enqueueing (AC: #5)

client.stream.subscribe(tickers: ["AAPL"]) do |filing|
  # Enqueue job and return quickly - don't block the reactor
  ProcessFilingJob.perform_async(filing.accession_no, filing.ticker)
end

ActiveJob integration (AC: #5)

client.stream.subscribe do |filing|
  ProcessFilingJob.perform_later(
    accession_no: filing.accession_no,
    form_type: filing.form_type
  )
end

Thread pool processing (AC: #5)

pool = Concurrent::ThreadPoolExecutor.new(max_threads: 10)
client.stream.subscribe do |filing|
  pool.post { process_filing(filing) }
end

Parameters:

  • tickers (Array<String>, String, nil) (defaults to: nil)

    Filter by ticker symbols (case-insensitive). Accepts array or single string.

  • form_types (Array<String>, String, nil) (defaults to: nil)

    Filter by form types (case-insensitive). Amendments are matched (e.g., “10-K” filter matches “10-K/A”)

Yields:

Raises:



213
214
215
216
217
218
219
220
# File 'lib/sec_api/stream.rb', line 213

def subscribe(tickers: nil, form_types: nil, &block)
  raise ArgumentError, "Block required for subscribe" unless block_given?

  @tickers = normalize_filter(tickers)
  @form_types = normalize_filter(form_types)
  @callback = block
  connect
end