Class: Skyfall::Jetstream

Inherits:
Stream
  • Object
show all
Defined in:
lib/skyfall/jetstream.rb

Overview

Note:

Most of the methods of this class that you might want to use are defined in Stream.

Client of a Jetstream service (JSON-based firehose).

This is an equivalent of Firehose for Jetstream sources, mirroring its API. It returns messages as instances of subclasses of Message, which are generally equivalent to the respective Firehose::Message variants as much as possible.

To connect to a Jetstream websocket, you need to:

  • create an instance of Jetstream, passing it the hostname/URL of the server, and optionally parameters such as cursor or collection/DID filters

  • set up callbacks to be run when connecting, disconnecting, when a message is received etc. (you need to set at least a message handler)

  • call Stream#connect to start the connection

  • handle the received messages

Examples:

client = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', {
  wanted_collections: 'app.bsky.feed.post',
  wanted_dids: @dids
})

client.on_message do |msg|
  next unless msg.type == :commit

  op = msg.operation

  if op.type == :bsky_post && op.action == :create
    puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}"
  end
end

client.connect

# You might also want to set some or all of these lifecycle callback handlers:

client.on_connecting { |url| puts "Connecting to #{url}..." }
client.on_connect { puts "Connected" }
client.on_disconnect { puts "Disconnected" }
client.on_reconnect { puts "Connection lost, trying to reconnect..." }
client.on_timeout { puts "Connection stalled, triggering a reconnect..." }
client.on_error { |e| puts "ERROR: #{e}" }

Defined Under Namespace

Classes: AccountMessage, CommitMessage, IdentityMessage, Message, Operation, UnknownMessage

Constant Summary

Constants inherited from Stream

Stream::MAX_RECONNECT_INTERVAL

Instance Attribute Summary collapse

Attributes inherited from Stream

#auto_reconnect, #check_heartbeat, #heartbeat_interval, #heartbeat_timeout, #last_update, #user_agent

Instance Method Summary collapse

Methods inherited from Stream

#connect, #default_user_agent, #disconnect, #inspect, #on_connect, #on_connecting, #on_disconnect, #on_error, #on_message, #on_raw_message, #on_reconnect, #on_timeout, #reconnect, #version_string

Constructor Details

#initialize(server, params = {}) ⇒ Jetstream

Returns a new instance of Jetstream.

Parameters:

  • server (String)

    Address of the server to connect to. Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.

  • params (Hash) (defaults to: {})

    options, see below:

Options Hash (params):

  • :cursor (Integer)

    cursor from which to resume

  • :wanted_dids (Array<String>)

    DID filter to pass to the server (:wantedDids is also accepted); value should be a DID string or an array of those

  • :wanted_collections (Array<String, Symbol>)

    collection filter to pass to the server (:wantedCollections is also accepted); value should be an NSID string or a symbol shorthand, or an array of those

Raises:

  • (ArgumentError)

    if the server parameter or the options are invalid



82
83
84
85
86
87
88
89
# File 'lib/skyfall/jetstream.rb', line 82

def initialize(server, params = {})
  require_relative 'jetstream/message'
  super(server)

  @params = check_params(params)
  @cursor = @params.delete(:cursor)
  @root_url = ensure_empty_path(@root_url)
end

Instance Attribute Details

#cursorInteger?

Current cursor (time of the last seen message)

Returns:

  • (Integer, nil)


62
63
64
# File 'lib/skyfall/jetstream.rb', line 62

def cursor
  @cursor
end