Class: Skyfall::Firehose

Inherits:
Stream
  • Object
show all
Defined in:
lib/skyfall/firehose.rb

Defined Under Namespace

Classes: AccountMessage, CommitMessage, HandleMessage, IdentityMessage, InfoMessage, LabelsMessage, Message, Operation, TombstoneMessage, UnknownMessage

Constant Summary collapse

SUBSCRIBE_REPOS =
"com.atproto.sync.subscribeRepos"
SUBSCRIBE_LABELS =
"com.atproto.label.subscribeLabels"
NAMED_ENDPOINTS =
{
  :subscribe_repos => SUBSCRIBE_REPOS,
  :subscribe_labels => SUBSCRIBE_LABELS
}

Constants inherited from Stream

Stream::EVENTS, Stream::MAX_RECONNECT_INTERVAL

Instance Attribute Summary collapse

Attributes inherited from Stream

#auto_reconnect, #check_heartbeat, #heartbeat_interval, #heartbeat_timeout, #last_update, #user_agent

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Stream

#connect, #default_user_agent, #disconnect, #inspect, #inspectable_variables, #reconnect, #start_heartbeat_timer, #stop_heartbeat_timer, #version_string

Constructor Details

#initialize(server, endpoint, cursor = nil) ⇒ Firehose

Returns a new instance of Firehose.



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/skyfall/firehose.rb', line 23

def initialize(server, endpoint, cursor = nil)
  require_relative 'firehose/message'
  super(server)

  @endpoint = check_endpoint(endpoint)
  @cursor = check_cursor(cursor)
  @root_url = @root_url.chomp('/')

  if URI(@root_url).path != ''
    raise ArgumentError, "Server parameter should not include any path"
  end
end

Instance Attribute Details

#cursorObject

Returns the value of attribute cursor.



14
15
16
# File 'lib/skyfall/firehose.rb', line 14

def cursor
  @cursor
end

Class Method Details

.new(server, endpoint, cursor = nil) ⇒ Object



16
17
18
19
20
21
# File 'lib/skyfall/firehose.rb', line 16

def self.new(server, endpoint, cursor = nil)
  # to be removed in 0.6
  instance = self.allocate
  instance.send(:initialize, server, endpoint, cursor)
  instance
end

Instance Method Details

#handle_message(msg) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/skyfall/firehose.rb', line 36

def handle_message(msg)
  data = msg.data.pack('C*')
  @handlers[:raw_message]&.call(data)

  if @handlers[:message]
    atp_message = Message.new(data)
    @cursor = atp_message.seq
    @handlers[:message].call(atp_message)
  else
    @cursor = nil
  end
end