Class: Diameter::Stack
- Inherits:
-
Object
- Object
- Diameter::Stack
- Includes:
- Internals
- Defined in:
- lib/diameter/stack.rb
Setup methods collapse
-
#add_handler(app_id, opts = {}) {|req, cxn| ... } ⇒ Object
Adds a handler for a specific Diameter application.
-
#initialize(host, realm) ⇒ Stack
constructor
Stack constructor.
-
#listen_for_tcp(port = 3868) ⇒ Object
Begins listening for inbound Diameter connections (making this a Diameter server instead of just a client).
-
#start ⇒ Object
Complete the stack initialization and begin reading from the TCP connections.
Peer connections and message sending collapse
-
#connect_to_peer(peer_uri, peer_host, realm) ⇒ Object
Creates a Peer connection to a Diameter agent at the specific network location indicated by peer_uri.
-
#peer_state(id) ⇒ Keyword
Retrieves the current state of a peer, defaulting to :CLOSED if the peer does not exist.
-
#send_answer(ans, original_cxn) ⇒ Object
Sends a Diameter answer.
-
#send_request(req) ⇒ Object
Sends a Diameter request.
Instance Method Summary collapse
-
#close(connection) ⇒ Object
Closes the given connection, blanking out any internal data structures associated with it.
-
#handle_message(msg_bytes, cxn) ⇒ Object
Handles a Diameter request straight from a network connection.
-
#shutdown ⇒ Object
This shuts the stack down, closing all TCP connections and terminating any background threads still waiting for an answer.
Constructor Details
#initialize(host, realm) ⇒ Stack
The stack does not advertise any applications to peers by default - #add_handler must be called early on.
Stack constructor.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/diameter/stack.rb', line 24 def initialize(host, realm) @local_host = host @local_realm = realm @auth_apps = [] @acct_apps = [] @pending_ete = {} @tcp_helper = TCPStackHelper.new(self) @peer_table = {} @handlers = {} @threadpool = pool = Concurrent::ThreadPoolExecutor.new( min_threads: 5, max_threads: 5, max_queue: 100, overflow_policy: :caller_runs ) Diameter.logger.log(Logger::INFO, 'Stack initialized') end |
Instance Method Details
#add_handler(app_id, opts = {}) {|req, cxn| ... } ⇒ Object
If you expect to only send requests for this application, not receive them, the block can be a no-op (e.g. ‘{ nil }`)
Adds a handler for a specific Diameter application.
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/diameter/stack.rb', line 85 def add_handler(app_id, opts={}, &blk) vendor = opts.fetch(:vendor, 0) auth = opts.fetch(:auth, false) acct = opts.fetch(:acct, false) raise ArgumentError.new("Must specify at least one of auth or acct") unless auth or acct @acct_apps << [app_id, vendor] if acct @auth_apps << [app_id, vendor] if auth @handlers[app_id] = blk end |
#close(connection) ⇒ Object
Closes the given connection, blanking out any internal data structures associated with it.
Likely to be moved to the Peer object in a future release/
118 119 120 |
# File 'lib/diameter/stack.rb', line 118 def close(connection) @tcp_helper.close(connection) end |
#connect_to_peer(peer_uri, peer_host, realm) ⇒ Object
Creates a Peer connection to a Diameter agent at the specific network location indicated by peer_uri.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/diameter/stack.rb', line 132 def connect_to_peer(peer_uri, peer_host, realm) uri = URI(peer_uri) cxn = @tcp_helper.setup_new_connection(uri.host, uri.port) avps = [AVP.create('Origin-Host', @local_host), AVP.create('Origin-Realm', @local_realm), AVP.create('Host-IP-Address', IPAddr.new('127.0.0.1')), AVP.create('Vendor-Id', 100), AVP.create('Product-Name', 'ruby-diameter') ] avps += app_avps cer_bytes = Message.new(version: 1, command_code: 257, app_id: 0, request: true, proxyable: false, retransmitted: false, error: false, avps: avps).to_wire @tcp_helper.send(cer_bytes, cxn) @peer_table[peer_host] = Peer.new(peer_host) @peer_table[peer_host].state = :WAITING @peer_table[peer_host].cxn = cxn @peer_table[peer_host] # Will move to :UP when the CEA is received end |
#handle_message(msg_bytes, cxn) ⇒ Object
Handles a Diameter request straight from a network connection. Intended to be called by TCPStackHelper after it retrieves a message, not directly by users.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/diameter/stack.rb', line 216 def (msg_bytes, cxn) # Common processing - ensure that this message has come in on this # peer's expected connection, and update the last time we saw # activity on this peer msg = Message.from_bytes(msg_bytes) Diameter.logger.debug("Handling message #{msg}") peer = msg.avp_by_name('Origin-Host').octet_string if @peer_table[peer] @peer_table[peer].reset_timer unless @peer_table[peer].cxn == cxn Diameter.logger.log(Logger::WARN, "Ignoring message - claims to be from #{peer} but comes from #{cxn} not #{@peer_table[peer].cxn}") end end if msg.command_code == 257 && msg.answer handle_cea(msg) elsif msg.command_code == 257 && msg.request handle_cer(msg, cxn) elsif msg.command_code == 280 && msg.request handle_dwr(msg, cxn) elsif msg.command_code == 280 && msg.answer # No-op - we've already updated our timestamp elsif msg.answer handle_other_answer(msg, cxn) elsif @handlers.has_key? msg.app_id @handlers[msg.app_id].call(msg, cxn) else fail "Received unknown message of type #{msg.command_code}" end end |
#listen_for_tcp(port = 3868) ⇒ Object
Begins listening for inbound Diameter connections (making this a Diameter server instead of just a client).
57 58 59 |
# File 'lib/diameter/stack.rb', line 57 def listen_for_tcp(port=3868) @tcp_helper.setup_new_listen_connection("0.0.0.0", port) end |
#peer_state(id) ⇒ Keyword
Retrieves the current state of a peer, defaulting to :CLOSED if the peer does not exist.
202 203 204 205 206 207 208 |
# File 'lib/diameter/stack.rb', line 202 def peer_state(id) if !@peer_table.key? id :CLOSED else @peer_table[id].state end end |
#send_answer(ans, original_cxn) ⇒ Object
Sends a Diameter answer. This is sent over the same connection the request was received on (which needs to be passed into to this method).
This adds this stack’s Origin-Host and Origin-Realm AVPs, if those AVPs don’t already exist.
191 192 193 194 195 |
# File 'lib/diameter/stack.rb', line 191 def send_answer(ans, original_cxn) fail "Must pass an answer" unless ans.answer ans.add_origin_host_and_realm(@local_host, @local_realm) @tcp_helper.send(ans.to_wire, original_cxn) end |
#send_request(req) ⇒ Object
Sends a Diameter request. This is routed to an appropriate peer based on the Destination-Host AVP.
This adds this stack’s Origin-Host and Origin-Realm AVPs, if those AVPs don’t already exist.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/diameter/stack.rb', line 158 def send_request(req) fail "Must pass a request" unless req.request req.add_origin_host_and_realm(@local_host, @local_realm) peer_name = req.avp_by_name('Destination-Host').octet_string state = peer_state(peer_name) if state == :UP peer = @peer_table[peer_name] @tcp_helper.send(req.to_wire, peer.cxn) q = Queue.new @pending_ete[req.ete] = q p = Concurrent::Promise.execute(executor: @threadpool) { Diameter.logger.debug("Waiting for answer to message with EtE #{req.ete}, queue #{q}") val = q.pop Diameter.logger.debug("Promise fulfilled for message with EtE #{req.ete}") val } return p else Diameter.logger.log(Logger::WARN, "Peer #{peer_name} is in state #{state} - cannot route") end end |
#shutdown ⇒ Object
This shuts the stack down, closing all TCP connections and terminating any background threads still waiting for an answer.
102 103 104 105 106 107 108 109 110 |
# File 'lib/diameter/stack.rb', line 102 def shutdown @tcp_helper.shutdown @pending_ete.each do |ete, q| Diameter.logger.debug("Shutting down queue #{q} as no answer has been received with EtE #{ete}") q.push :shutdown end @threadpool.kill @threadpool.wait_for_termination(5) end |
#start ⇒ Object
Complete the stack initialization and begin reading from the TCP connections.
49 50 51 |
# File 'lib/diameter/stack.rb', line 49 def start @tcp_helper.start_main_loop end |