Class: Vines::Stream
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- Vines::Stream
- Includes:
- Log
- Defined in:
- lib/vines/stream.rb,
lib/vines/stream/http.rb,
lib/vines/stream/sasl.rb,
lib/vines/stream/state.rb,
lib/vines/stream/client.rb,
lib/vines/stream/parser.rb,
lib/vines/stream/server.rb,
lib/vines/stream/component.rb,
lib/vines/stream/http/auth.rb,
lib/vines/stream/http/bind.rb,
lib/vines/stream/client/tls.rb,
lib/vines/stream/http/ready.rb,
lib/vines/stream/http/start.rb,
lib/vines/stream/server/tls.rb,
lib/vines/stream/client/auth.rb,
lib/vines/stream/client/bind.rb,
lib/vines/stream/server/auth.rb,
lib/vines/stream/client/ready.rb,
lib/vines/stream/client/start.rb,
lib/vines/stream/http/request.rb,
lib/vines/stream/http/session.rb,
lib/vines/stream/server/ready.rb,
lib/vines/stream/server/start.rb,
lib/vines/stream/client/closed.rb,
lib/vines/stream/http/sessions.rb,
lib/vines/stream/client/session.rb,
lib/vines/stream/component/ready.rb,
lib/vines/stream/component/start.rb,
lib/vines/stream/http/bind_restart.rb,
lib/vines/stream/client/auth_restart.rb,
lib/vines/stream/client/bind_restart.rb,
lib/vines/stream/component/handshake.rb,
lib/vines/stream/server/auth_restart.rb,
lib/vines/stream/server/outbound/tls.rb,
lib/vines/stream/server/final_restart.rb,
lib/vines/stream/server/outbound/auth.rb,
lib/vines/stream/server/outbound/start.rb,
lib/vines/stream/server/outbound/tls_result.rb,
lib/vines/stream/server/outbound/auth_result.rb,
lib/vines/stream/server/outbound/auth_restart.rb,
lib/vines/stream/server/outbound/final_restart.rb,
lib/vines/stream/server/outbound/final_features.rb
Overview
The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.
Defined Under Namespace
Classes: Client, Component, Http, Parser, SASL, Server, State
Constant Summary collapse
- ERROR =
'error'.freeze
- PAD =
20
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#domain ⇒ Object
readonly
Returns the value of attribute domain.
-
#user ⇒ Object
Returns the value of attribute user.
Instance Method Summary collapse
-
#advance(state) ⇒ Object
Advance the stream’s state machine to the new state.
- #available_resources(*jid) ⇒ Object
- #cert_domain_matches?(domain) ⇒ Boolean
-
#close_connection(after_writing = false) ⇒ Object
Advance the state machine into the
Closed
state so any remaining queued nodes are not processed while we’re waiting for EM to actually close the connection. - #connected_resources(jid) ⇒ Object
-
#create_parser ⇒ Object
Initialize a new XML parser for this connection.
- #encrypt ⇒ Object
-
#encrypt? ⇒ Boolean
Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.
-
#error(e) ⇒ Object
Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open.
-
#initialize(config) ⇒ Stream
constructor
A new instance of Stream.
- #interested_resources(*jid) ⇒ Object
- #post_init ⇒ Object
- #receive_data(data) ⇒ Object
-
#reset ⇒ Object
Reset the connection’s XML parser when a new <stream:stream> header is received.
- #router ⇒ Object
- #ssl_verify_peer(pem) ⇒ Object
-
#storage(domain = nil) ⇒ Object
Returns the storage system for the domain.
- #unbind ⇒ Object
-
#update_user_streams(user) ⇒ Object
Reload the user’s information into their active connections.
-
#vhost ⇒ Object
Returns the Vines::Config::Host virtual host for the stream’s domain.
-
#write(data) ⇒ Object
Send the data over the wire to this client.
Methods included from Log
Constructor Details
#initialize(config) ⇒ Stream
Returns a new instance of Stream.
16 17 18 |
# File 'lib/vines/stream.rb', line 16 def initialize(config) @config = config end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
13 14 15 |
# File 'lib/vines/stream.rb', line 13 def config @config end |
#domain ⇒ Object (readonly)
Returns the value of attribute domain.
13 14 15 |
# File 'lib/vines/stream.rb', line 13 def domain @domain end |
#user ⇒ Object
Returns the value of attribute user.
14 15 16 |
# File 'lib/vines/stream.rb', line 14 def user @user end |
Instance Method Details
#advance(state) ⇒ Object
Advance the stream’s state machine to the new state. XML nodes received by the stream will be passed to this state’s node
method.
143 144 145 |
# File 'lib/vines/stream.rb', line 143 def advance(state) @state = state end |
#available_resources(*jid) ⇒ Object
93 94 95 |
# File 'lib/vines/stream.rb', line 93 def available_resources(*jid) router.available_resources(*jid, user.jid) end |
#cert_domain_matches?(domain) ⇒ Boolean
110 111 112 |
# File 'lib/vines/stream.rb', line 110 def cert_domain_matches?(domain) @store.domain?(get_peer_cert, domain) end |
#close_connection(after_writing = false) ⇒ Object
Advance the state machine into the Closed
state so any remaining queued nodes are not processed while we’re waiting for EM to actually close the connection.
47 48 49 50 51 |
# File 'lib/vines/stream.rb', line 47 def close_connection(after_writing=false) super @closed = true advance(Client::Closed.new(self)) end |
#connected_resources(jid) ⇒ Object
89 90 91 |
# File 'lib/vines/stream.rb', line 89 def connected_resources(jid) router.connected_resources(jid, user.jid) end |
#create_parser ⇒ Object
Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).
36 37 38 39 40 41 42 |
# File 'lib/vines/stream.rb', line 36 def create_parser @parser = Parser.new.tap do |p| p.stream_open {|node| @nodes.push(node) } p.stream_close { close_connection } p.stanza {|node| @nodes.push(node) } end end |
#encrypt ⇒ Object
123 124 125 126 |
# File 'lib/vines/stream.rb', line 123 def encrypt cert, key = @store.files_for_domain(domain) start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true) end |
#encrypt? ⇒ Boolean
Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.
130 131 132 |
# File 'lib/vines/stream.rb', line 130 def encrypt? !@store.files_for_domain(domain).nil? end |
#error(e) ⇒ Object
Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.
150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/vines/stream.rb', line 150 def error(e) case e when SaslError, StanzaError write(e.to_xml) when StreamError send_stream_error(e) close_stream else log.error(e) send_stream_error(StreamErrors::InternalServerError.new) close_stream end end |
#interested_resources(*jid) ⇒ Object
97 98 99 |
# File 'lib/vines/stream.rb', line 97 def interested_resources(*jid) router.interested_resources(*jid, user.jid) end |
#post_init ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/vines/stream.rb', line 20 def post_init @remote_addr, @local_addr = addresses @user, @closed, @stanza_size = nil, false, 0 @bucket = TokenBucket.new(100, 10) @store = Store.new(@config.certs) @nodes = EM::Queue.new process_node_queue create_parser log.info { "%s %21s -> %s" % ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] } end |
#receive_data(data) ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/vines/stream.rb', line 53 def receive_data(data) return if @closed @stanza_size += data.bytesize if @stanza_size < max_stanza_size @parser << data rescue error(StreamErrors::NotWellFormed.new) else error(StreamErrors::PolicyViolation.new('max stanza size reached')) end end |
#reset ⇒ Object
Reset the connection’s XML parser when a new <stream:stream> header is received.
65 66 67 |
# File 'lib/vines/stream.rb', line 65 def reset create_parser end |
#router ⇒ Object
164 165 166 |
# File 'lib/vines/stream.rb', line 164 def router @config.router end |
#ssl_verify_peer(pem) ⇒ Object
101 102 103 104 105 106 107 108 |
# File 'lib/vines/stream.rb', line 101 def ssl_verify_peer(pem) # EM is supposed to close the connection when this returns false, # but it only does that for inbound connections, not when we # make a connection to another server. @store.trusted?(pem).tap do |trusted| close_connection unless trusted end end |
#storage(domain = nil) ⇒ Object
Returns the storage system for the domain. If no domain is given, the stream’s storage mechanism is returned.
71 72 73 |
# File 'lib/vines/stream.rb', line 71 def storage(domain=nil) @config.storage(domain || self.domain) end |
#unbind ⇒ Object
134 135 136 137 138 139 |
# File 'lib/vines/stream.rb', line 134 def unbind router.delete(self) log.info { "%s %21s -> %s" % ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] } log.info { "Streams connected: #{router.size}" } end |
#update_user_streams(user) ⇒ Object
Reload the user’s information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.
83 84 85 86 87 |
# File 'lib/vines/stream.rb', line 83 def update_user_streams(user) connected_resources(user.jid.).each do |stream| stream.user.update_from(user) end end |
#vhost ⇒ Object
Returns the Vines::Config::Host virtual host for the stream’s domain.
76 77 78 |
# File 'lib/vines/stream.rb', line 76 def vhost @config.vhost(domain) end |
#write(data) ⇒ Object
Send the data over the wire to this client.
115 116 117 118 119 120 121 |
# File 'lib/vines/stream.rb', line 115 def write(data) log_node(data, :out) if data.respond_to?(:to_xml) data = data.to_xml(:indent => 0) end send_data(data) end |