Class: Skyfall::Firehose

Inherits:
Stream
  • Object
show all
Defined in:
lib/skyfall/firehose.rb

Overview

Note:

Most of the methods of this class that you might want to use are defined in Stream.

Client of a standard AT Protocol firehose websocket.

This is the main Skyfall class to use to connect to a CBOR-based firehose websocket endpoint like subscribeRepos (on a PDS or a relay).

To connect to the firehose, you need to:

  • create an instance of Firehose, passing it the hostname/URL of the server, name of the endpoint (normally :subscribe_repos) and optionally a cursor

  • set up callbacks to be run when connecting, disconnecting, when a message is received etc. (you need to set at least a message handler)

  • call Stream#connect to start the connection

  • handle the received messages (instances of a Message subclass)

Examples:

client = Skyfall::Firehose.new('bsky.network', :subscribe_repos, last_cursor)
# or: client = Skyfall::Firehose.new('bsky.network', last_cursor)

client.on_message do |msg|
  next unless msg.type == :commit

  msg.operations.each do |op|
    if op.type == :bsky_post && op.action == :create
      puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}"
    end
  end
end

client.connect

# You might also want to set some or all of these lifecycle callback handlers:

client.on_connecting { |url| puts "Connecting to #{url}..." }
client.on_connect { puts "Connected" }
client.on_disconnect { puts "Disconnected" }
client.on_reconnect { puts "Connection lost, trying to reconnect..." }
client.on_timeout { puts "Connection stalled, triggering a reconnect..." }
client.on_error { |e| puts "ERROR: #{e}" }

Defined Under Namespace

Classes: AccountMessage, CommitMessage, IdentityMessage, InfoMessage, LabelsMessage, Message, Operation, SyncMessage, UnknownMessage

Constant Summary collapse

SUBSCRIBE_REPOS =

the main firehose endpoint on a PDS or relay

"com.atproto.sync.subscribeRepos"
SUBSCRIBE_LABELS =

only used with moderation services (labellers)

"com.atproto.label.subscribeLabels"
NAMED_ENDPOINTS =
{
  :subscribe_repos => SUBSCRIBE_REPOS,
  :subscribe_labels => SUBSCRIBE_LABELS
}

Constants inherited from Stream

Stream::MAX_RECONNECT_INTERVAL

Instance Attribute Summary collapse

Attributes inherited from Stream

#auto_reconnect, #check_heartbeat, #heartbeat_interval, #heartbeat_timeout, #last_update, #user_agent

Instance Method Summary collapse

Methods inherited from Stream

#connect, #default_user_agent, #disconnect, #inspect, #on_connect, #on_connecting, #on_disconnect, #on_error, #on_message, #on_raw_message, #on_reconnect, #on_timeout, #reconnect, #version_string

Constructor Details

#initialize(server, endpoint, cursor = nil) ⇒ Firehose #initialize(server, cursor = nil) ⇒ Firehose

Returns a new instance of Firehose.

Overloads:

  • #initialize(server, endpoint, cursor = nil) ⇒ Firehose

    Returns a new instance of a firehose client connecting to a given endpoint.

    Parameters:

    • server (String)

      Address of the server to connect to. Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.

    • endpoint (Symbol, String)

      XRPC method name. Pass either a full NSID, or a symbol shorthand from NAMED_ENDPOINTS

    • cursor (Integer, String, nil) (defaults to: nil)

      sequence number from which to resume

    Raises:

    • (ArgumentError)

      if any of the parameters is invalid

  • #initialize(server, cursor = nil) ⇒ Firehose

    Returns a new instance of a firehose client connecting to subscribeRepos.

    Parameters:

    • server (String)

      Address of the server to connect to. Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.

    • cursor (Integer, String, nil) (defaults to: nil)

      sequence number from which to resume

    Raises:

    • (ArgumentError)

      if any of the parameters is invalid



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/skyfall/firehose.rb', line 94

def initialize(server, endpoint = nil, cursor = nil)
  require_relative 'firehose/message'
  super(server)

  if cursor.nil? && (endpoint.nil? || endpoint.to_s =~ /\A\d+\z/)
    cursor = endpoint
    endpoint = :subscribe_repos
  end

  @endpoint = check_endpoint(endpoint)
  @cursor = check_cursor(cursor)
  @root_url = ensure_empty_path(@root_url)
end

Instance Attribute Details

#cursorInteger?

Current cursor (seq of the last seen message)

Returns:

  • (Integer, nil)


67
68
69
# File 'lib/skyfall/firehose.rb', line 67

def cursor
  @cursor
end