Class: ActiveMatrix::Client
- Inherits:
-
Object
- Object
- ActiveMatrix::Client
- Extended by:
- Extensions, Forwardable
- Includes:
- Logging
- Defined in:
- lib/active_matrix/client_pool.rb,
lib/active_matrix/client.rb
Overview
Monkey patch Client to support pooling
Instance Attribute Summary collapse
-
#api ⇒ Object
readonly
Returns the value of attribute api.
-
#cache ⇒ :all, ...
The cache level.
-
#next_batch ⇒ String
(also: #sync_token)
The batch token for a running sync.
-
#sync_filter ⇒ Hash, String
The global sync filter.
Class Method Summary collapse
-
.new_for_domain(domain, **params) ⇒ Client
Create a new client instance from only a Matrix HS domain.
Instance Method Summary collapse
-
#account_data ⇒ Object
Retrieve an account data helper.
-
#checkin_to_pool ⇒ Object
Checkin this client back to the pool.
-
#create_room(room_alias = nil, **params) ⇒ Room
Creates a new room.
-
#direct_room(mxid) ⇒ Room?
Gets a direct message room for the given user if one exists.
-
#direct_rooms ⇒ Hash[String,Array[String]]
Gets a list of all direct chat rooms (1:1 chats / direct message chats) for the currenct user.
-
#ensure_room(room_id) ⇒ Room
Ensures that a room exists in the cache.
-
#find_room(room_id_or_alias, only_canonical: true) ⇒ Room?
Find a room in the locally cached list of rooms that the current user is part of.
-
#get_user(user_id) ⇒ User
Get a User instance from a MXID.
-
#initialize(hs_url, client_cache: :all, **params) ⇒ Client
constructor
A new instance of Client.
-
#join_room(room_id_or_alias, server_name: []) ⇒ Room
Joins an already created room.
- #listen_forever(timeout: 30, bad_sync_timeout: 5, sync_interval: 0, **params) ⇒ Object
-
#listening? ⇒ Boolean
Check if there’s a thread listening for events.
-
#logged_in? ⇒ Boolean
Check if there’s a currently logged in session.
-
#login(username, password, sync_timeout: 15, full_state: false, **params) ⇒ Object
Logs in as a user on the connected HS.
-
#login_with_token(username, token, sync_timeout: 15, full_state: false, **params) ⇒ Object
Logs in as a user on the connected HS.
-
#logout ⇒ Object
Logs out of the current session.
-
#mxid ⇒ MXID
(also: #user_id)
Gets the currently logged in user’s MXID.
-
#presence ⇒ Response
Gets the current user presence status object.
-
#public_rooms ⇒ Array[Room]
Gets a list of all the public rooms on the connected HS.
-
#register_as_guest ⇒ Object
Register - and log in - on the connected HS as a guest.
-
#register_with_password(username, password, **params) ⇒ Object
Register a new user account on the connected HS.
-
#registered_3pids ⇒ Response
Retrieve a list of all registered third-party IDs for the current user.
-
#reload_rooms! ⇒ Boolean
(also: #refresh_rooms!, #reload_spaces!)
Refresh the list of currently handled rooms, replacing it with the user’s currently joined rooms.
-
#remove_room_alias(room_alias) ⇒ Object
Remove a room alias.
-
#rooms ⇒ Array[Room]
Gets a list of all relevant rooms, either the ones currently handled by the client, or the list of currently joined ones if no rooms are handled.
-
#set_presence(status, message: nil) ⇒ Object
Sets the current user’s presence status.
-
#spaces ⇒ Array[Room]
Get a list of all joined Matrix Spaces.
-
#start_listener_thread(**params) ⇒ Object
Starts a background thread that will listen to new events.
-
#stop_listener_thread ⇒ Object
Stops the running background thread if one is active.
-
#sync(skip_store_batch: false, **params) ⇒ Object
(also: #listen_for_events)
Run a message sync round, triggering events as necessary.
-
#upload(content, content_type) ⇒ URI::MXC
Upload a piece of data to the media repo.
Methods included from Extensions
Methods included from Logging
Constructor Details
#initialize(hs_url, client_cache: :all, **params) ⇒ Client
Returns a new instance of Client.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/active_matrix/client.rb', line 57 def initialize(hs_url, client_cache: :all, **params) event_initialize params[:user_id] ||= params[:mxid] if params[:mxid] if hs_url.is_a? Api @api = hs_url params.each do |k, v| api.instance_variable_set("@#{k}", v) if api.instance_variable_defined? "@#{k}" end else @api = Api.new hs_url, **params end @cache = client_cache @identity_server = nil @mxid = nil @sync_thread = nil @sync_filter = { room: { timeline: { limit: params.fetch(:sync_filter_limit, 20) }, state: { lazy_load_members: true } } } @next_batch = nil @bad_sync_timeout_limit = 60 * 60 params.each do |k, v| instance_variable_set("@#{k}", v) if instance_variable_defined? "@#{k}" end @rooms = {} @room_handlers = {} @users = {} @should_listen = false raise ArgumentError, 'Cache value must be one of of [:all, :some, :none]' unless %i[all some none].include? @cache return unless params[:user_id] @mxid = params[:user_id] end |
Instance Attribute Details
#api ⇒ Object (readonly)
Returns the value of attribute api.
21 22 23 |
# File 'lib/active_matrix/client.rb', line 21 def api @api end |
#cache ⇒ :all, ...
The cache level
21 |
# File 'lib/active_matrix/client.rb', line 21 attr_reader :api |
#next_batch ⇒ String Also known as: sync_token
The batch token for a running sync
21 |
# File 'lib/active_matrix/client.rb', line 21 attr_reader :api |
#sync_filter ⇒ Hash, String
The global sync filter
21 |
# File 'lib/active_matrix/client.rb', line 21 attr_reader :api |
Class Method Details
.new_for_domain(domain, **params) ⇒ Client
This method will not verify that the created client has a valid connection, it will only perform the necessary lookups to build a connection URL.
Create a new client instance from only a Matrix HS domain
This will use the well-known delegation lookup to find the correct client URL
43 44 45 46 47 48 49 |
# File 'lib/active_matrix/client.rb', line 43 def self.new_for_domain(domain, **params) api = ActiveMatrix::Api.new_for_domain(domain, keep_wellknown: true) return new(api, **params) unless api.well_known&.key?('m.identity_server') identity_server = ActiveMatrix::Api.new(api.well_known['m.identity_server']['base_url'], protocols: %i[IS]) new(api, **params, identity_server: identity_server) end |
Instance Method Details
#account_data ⇒ Object
Retrieve an account data helper
167 168 169 170 171 |
# File 'lib/active_matrix/client.rb', line 167 def account_data return ActiveMatrix::AccountDataCache.new self if cache == :none @account_data ||= ActiveMatrix::AccountDataCache.new self end |
#checkin_to_pool ⇒ Object
Checkin this client back to the pool
190 191 192 |
# File 'lib/active_matrix/client_pool.rb', line 190 def checkin_to_pool ClientPool.instance.checkin(self) if defined?(ClientPool) end |
#create_room(room_alias = nil, **params) ⇒ Room
Creates a new room
381 382 383 384 |
# File 'lib/active_matrix/client.rb', line 381 def create_room(room_alias = nil, **params) data = api.create_room(**params, room_alias: room_alias) ensure_room(data.room_id) end |
#direct_room(mxid) ⇒ Room?
Will return the oldest room if multiple exist
Gets a direct message room for the given user if one exists
177 178 179 180 181 182 183 |
# File 'lib/active_matrix/client.rb', line 177 def direct_room(mxid) mxid = ActiveMatrix::MXID.new mxid.to_s unless mxid.is_a? ActiveMatrix::MXID raise ArgumentError, 'Must be a valid user ID' unless mxid.user? room_id = direct_rooms[mxid.to_s]&.first ensure_room room_id if room_id end |
#direct_rooms ⇒ Hash[String,Array[String]]
Gets a list of all direct chat rooms (1:1 chats / direct message chats) for the currenct user
162 163 164 |
# File 'lib/active_matrix/client.rb', line 162 def direct_rooms account_data['m.direct'].transform_keys(&:to_s) end |
#ensure_room(room_id) ⇒ Room
Ensures that a room exists in the cache
558 559 560 561 562 563 564 565 566 567 568 569 570 571 |
# File 'lib/active_matrix/client.rb', line 558 def ensure_room(room_id) room_id = MXID.new room_id.to_s unless room_id.is_a? MXID raise ArgumentError, 'Must be a room ID' unless room_id.room_id? room_id = room_id.to_s ret = @rooms.fetch(room_id) do room = Room.new(self, room_id) @rooms[room_id] = room unless cache == :none room end # Need to figure out a way to handle multiple types ret = @rooms[room_id] = ret.to_space if ret.instance_variable_get :@room_type ret end |
#find_room(room_id_or_alias, only_canonical: true) ⇒ Room?
Find a room in the locally cached list of rooms that the current user is part of
404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/active_matrix/client.rb', line 404 def find_room(room_id_or_alias, only_canonical: true) room_id_or_alias = MXID.new(room_id_or_alias.to_s) unless room_id_or_alias.is_a? MXID raise ArgumentError, 'Must be a room id or alias' unless room_id_or_alias.room? return @rooms.fetch(room_id_or_alias.to_s, nil) if room_id_or_alias.room_id? room = @rooms.values.find { |r| r.aliases.include? room_id_or_alias.to_s } return room if only_canonical room || @rooms.values.find { |r| r.aliases(canonical_only: false).include? room_id_or_alias.to_s } end |
#get_user(user_id) ⇒ User
The method doesn’t perform any existence checking, so the returned User object may point to a non-existent user
Get a User instance from a MXID
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/active_matrix/client.rb', line 422 def get_user(user_id) user_id = mxid if user_id == :self user_id = MXID.new user_id.to_s unless user_id.is_a? MXID raise ArgumentError, 'Must be a User ID' unless user_id.user? # To still use regular string storage in the hash itself user_id = user_id.to_s if cache == :all @users[user_id] ||= User.new(self, user_id) else User.new(self, user_id) end end |
#join_room(room_id_or_alias, server_name: []) ⇒ Room
Joins an already created room
392 393 394 395 396 |
# File 'lib/active_matrix/client.rb', line 392 def join_room(room_id_or_alias, server_name: []) server_name = [server_name] unless server_name.is_a? Array data = api.join_room(room_id_or_alias, server_name: server_name) ensure_room(data.fetch(:room_id, room_id_or_alias)) end |
#listen_forever(timeout: 30, bad_sync_timeout: 5, sync_interval: 0, **params) ⇒ Object
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 |
# File 'lib/active_matrix/client.rb', line 573 def listen_forever(timeout: 30, bad_sync_timeout: 5, sync_interval: 0, **params) orig_bad_sync_timeout = bad_sync_timeout + 0 while @should_listen begin sync(**params, timeout: timeout) return unless @should_listen bad_sync_timeout = orig_bad_sync_timeout sleep(sync_interval) if sync_interval.positive? rescue MatrixRequestError => e return unless @should_listen logger.warn("A #{e.class} occurred during sync") if e.httpstatus >= 500 logger.warn("Serverside error, retrying in #{bad_sync_timeout} seconds...") sleep(bad_sync_timeout) if bad_sync_timeout.positive? bad_sync_timeout = [bad_sync_timeout * 2, @bad_sync_timeout_limit].min end end end rescue StandardError => e logger.error "Unhandled #{e.class} raised in background listener" logger.error [e., *e.backtrace].join($RS) fire_error(ErrorEvent.new(e, :listener_thread)) end |
#listening? ⇒ Boolean
Check if there’s a thread listening for events
516 517 518 |
# File 'lib/active_matrix/client.rb', line 516 def listening? @sync_thread&.alive? == true end |
#logged_in? ⇒ Boolean
This will not check if the session is valid, only if it exists
Check if there’s a currently logged in session
334 335 336 |
# File 'lib/active_matrix/client.rb', line 334 def logged_in? !@api.access_token.nil? end |
#login(username, password, sync_timeout: 15, full_state: false, **params) ⇒ Object
Logs in as a user on the connected HS
This will also trigger an initial sync unless no_sync is set
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/active_matrix/client.rb', line 278 def login(username, password, sync_timeout: 15, full_state: false, **params) username = username.to_s unless username.is_a?(String) password = password.to_s unless password.is_a?(String) raise ArgumentError, "Username can't be nil or empty" if username.blank? raise ArgumentError, "Password can't be nil or empty" if password.blank? data = api.login(user: username, password: password) post_authentication(data) return if params[:no_sync] sync timeout: sync_timeout, full_state: full_state, allow_sync_retry: params.fetch(:allow_sync_retry, nil) end |
#login_with_token(username, token, sync_timeout: 15, full_state: false, **params) ⇒ Object
Logs in as a user on the connected HS
This will also trigger an initial sync unless no_sync is set
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/active_matrix/client.rb', line 306 def login_with_token(username, token, sync_timeout: 15, full_state: false, **params) username = username.to_s unless username.is_a?(String) token = token.to_s unless token.is_a?(String) raise ArgumentError, "Username can't be nil or empty" if username.blank? raise ArgumentError, "Token can't be nil or empty" if token.blank? data = api.login(user: username, token: token, type: 'm.login.token') post_authentication(data) return if params[:no_sync] sync timeout: sync_timeout, full_state: full_state, allow_sync_retry: params.fetch(:allow_sync_retry, nil) end |
#logout ⇒ Object
Logs out of the current session
324 325 326 327 328 |
# File 'lib/active_matrix/client.rb', line 324 def logout api.logout @api.access_token = nil @mxid = nil end |
#mxid ⇒ MXID Also known as: user_id
Gets the currently logged in user’s MXID
104 105 106 107 |
# File 'lib/active_matrix/client.rb', line 104 def mxid @mxid ||= MXID.new api.whoami?[:user_id] if api&.access_token @mxid end |
#presence ⇒ Response
Gets the current user presence status object
116 117 118 |
# File 'lib/active_matrix/client.rb', line 116 def presence api.get_presence_status(mxid).tap { |h| h.delete :user_id } end |
#public_rooms ⇒ Array[Room]
This will try to list all public rooms on the HS, and may take a while on larger instances
Gets a list of all the public rooms on the connected HS
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/active_matrix/client.rb', line 136 def public_rooms rooms = [] since = nil loop do data = api.get_public_rooms since: since data[:chunk].each do |chunk| rooms << Room.new(self, chunk[:room_id], name: chunk[:name], topic: chunk[:topic], aliases: chunk[:aliases], canonical_alias: chunk[:canonical_alias], avatar_url: chunk[:avatar_url], join_rule: :public, world_readable: chunk[:world_readable]).tap do |r| r.instance_variable_set :@guest_access, chunk[:guest_can_join] ? :can_join : :forbidden end end break if data[:next_batch].nil? since = data[:next_batch] end rooms end |
#register_as_guest ⇒ Object
This feature is not commonly supported by many HSes
Register - and log in - on the connected HS as a guest
236 237 238 239 |
# File 'lib/active_matrix/client.rb', line 236 def register_as_guest data = api.register(kind: :guest) post_authentication(data) end |
#register_with_password(username, password, **params) ⇒ Object
This method will currently always use auth type ‘m.login.dummy’
Register a new user account on the connected HS
This will also trigger an initial sync unless no_sync is set
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/active_matrix/client.rb', line 251 def register_with_password(username, password, **params) username = username.to_s unless username.is_a?(String) password = password.to_s unless password.is_a?(String) raise ArgumentError, "Username can't be nil or empty" if username.blank? raise ArgumentError, "Password can't be nil or empty" if password.nil? || username.empty? data = api.register(auth: { type: 'm.login.dummy' }, username: username, password: password) post_authentication(data) return if params[:no_sync] sync full_state: true, allow_sync_retry: params.fetch(:allow_sync_retry, nil) end |
#registered_3pids ⇒ Response
Retrieve a list of all registered third-party IDs for the current user
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/active_matrix/client.rb', line 342 def registered_3pids data = api.get_3pids data.threepids.each do |obj| obj.instance_eval do def added_at Time.zone.at(self[:added_at] / 1000) end def validated_at return unless validated? Time.zone.at(self[:validated_at] / 1000) end def validated? key? :validated_at end def to_s "#{self[:medium]}:#{self[:address]}" end def inspect "#<ActiveMatrix::Response 3pid=#{to_s.inspect} added_at=\"#{added_at}\"#{" validated_at=\"#{validated_at}\"" if validated?}>" end end end data end |
#reload_rooms! ⇒ Boolean Also known as: refresh_rooms!, reload_spaces!
This will be a no-op if the cache level is set to :none
Refresh the list of currently handled rooms, replacing it with the user’s currently joined rooms.
219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/active_matrix/client.rb', line 219 def reload_rooms! return true if cache == :none @rooms.clear api.get_joined_rooms.joined_rooms.each do |id| r = ensure_room(id) r.reload! end true end |
#remove_room_alias(room_alias) ⇒ Object
Remove a room alias
442 443 444 445 446 447 |
# File 'lib/active_matrix/client.rb', line 442 def remove_room_alias(room_alias) room_alias = MXID.new room_alias.to_s unless room_alias.is_a? MXID raise ArgumentError, 'Must be a room alias' unless room_alias.room_alias? api.remove_room_alias(room_alias) end |
#rooms ⇒ Array[Room]
This will always return the empty array if the cache level is set to :none
Gets a list of all relevant rooms, either the ones currently handled by the client, or the list of currently joined ones if no rooms are handled
191 192 193 194 195 196 197 198 199 |
# File 'lib/active_matrix/client.rb', line 191 def rooms if @rooms.empty? && cache != :none api.get_joined_rooms.joined_rooms.each do |id| ensure_room(id) end end @rooms.values end |
#set_presence(status, message: nil) ⇒ Object
Sets the current user’s presence status
126 127 128 129 130 |
# File 'lib/active_matrix/client.rb', line 126 def set_presence(status, message: nil) raise ArgumentError, 'Presence must be one of :online, :offline, :unavailable' unless %i[online offline unavailable].include?(status) api.set_presence_status(mxid, status, message: ) end |
#spaces ⇒ Array[Room]
Get a list of all joined Matrix Spaces
204 205 206 207 208 209 210 211 212 |
# File 'lib/active_matrix/client.rb', line 204 def spaces rooms = if cache == :none api.get_joined_rooms.joined_rooms.map { |id| Room.new(self, id) } else self.rooms end rooms.select(&:space?) end |
#start_listener_thread(**params) ⇒ Object
Starts a background thread that will listen to new events
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 |
# File 'lib/active_matrix/client.rb', line 465 def start_listener_thread(**params) return if listening? @should_listen = true if api.protocol?(:MSC) && api.msc2108? params[:filter] = sync_filter unless params.key? :filter params[:filter] = params[:filter].to_json unless params[:filter].nil? || params[:filter].is_a?(String) params[:since] = @next_batch if @next_batch errors = 0 thread, cancel_token = api.msc2108_sync_sse(params) do |data, event:, id:| @next_batch = id if id case event.to_sym when :sync handle_sync_response(data) errors = 0 when :sync_error logger.error "SSE Sync error received; #{data.type}: #{data.}" errors += 1 # TODO: Allow configuring raise 'Aborting due to excessive errors' if errors >= 5 end end @should_listen = cancel_token else thread = Thread.new { listen_forever(**params) } end @sync_thread = thread thread.run end |
#stop_listener_thread ⇒ Object
Stops the running background thread if one is active
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 |
# File 'lib/active_matrix/client.rb', line 499 def stop_listener_thread return unless @sync_thread if @should_listen.is_a? Hash @should_listen[:run] = false else @should_listen = false end if @sync_thread.alive? ret = @sync_thread.join(0.1) @sync_thread.kill unless ret end @sync_thread = nil end |
#sync(skip_store_batch: false, **params) ⇒ Object Also known as: listen_for_events
Run a message sync round, triggering events as necessary
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/active_matrix/client.rb', line 530 def sync(skip_store_batch: false, **params) extra_params = { filter: sync_filter, timeout: 30 } extra_params[:since] = @next_batch unless @next_batch.nil? extra_params.merge!(params) extra_params[:filter] = extra_params[:filter].to_json unless extra_params[:filter].is_a? String attempts = 0 data = loop do break api.sync(**extra_params) rescue ActiveMatrix::MatrixTimeoutError => e raise e if (attempts += 1) >= params.fetch(:allow_sync_retry, 0) end @next_batch = data[:next_batch] unless skip_store_batch handle_sync_response(data) true end |
#upload(content, content_type) ⇒ URI::MXC
Upload a piece of data to the media repo
455 456 457 458 459 460 |
# File 'lib/active_matrix/client.rb', line 455 def upload(content, content_type) data = api.media_upload(content, content_type) return URI(data[:content_uri]) if data.key? :content_uri raise MatrixUnexpectedResponseError, 'Upload succeeded, but no media URI returned' end |