Class: Diameter::Stack

Inherits:
Object
  • Object
show all
Includes:
Internals
Defined in:
lib/diameter/stack.rb

Setup methods collapse

Peer connections and message sending collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, realm) ⇒ Stack

Note:

The stack does not advertise any applications to peers by default - #add_handler must be called early on.

Stack constructor.

Parameters:

  • host (String)

    The Diameter Identity of this stack (for the Origin-Host AVP).

  • realm (String)

    The Diameter realm of this stack (for the Origin-Realm AVP).



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/diameter/stack.rb', line 24

def initialize(host, realm)
  @local_host = host
  @local_realm = realm

  @auth_apps = []
  @acct_apps = []

  @pending_ete = {}

  @tcp_helper = TCPStackHelper.new(self)
  @peer_table = {}
  @handlers = {}

  @threadpool = pool = Concurrent::ThreadPoolExecutor.new(
                                                          min_threads: 5,
                                                          max_threads: 5,
                                                          max_queue: 100,
                                                          overflow_policy: :caller_runs
                                                          )

  
  Diameter.logger.log(Logger::INFO, 'Stack initialized')
end

Instance Method Details

#add_handler(app_id, opts = {}) {|req, cxn| ... } ⇒ Object

Note:

If you expect to only send requests for this application, not receive them, the block can be a no-op (e.g. ‘{ nil }`)

Adds a handler for a specific Diameter application.

Parameters:

  • app_id (Fixnum)

    The Diameter application ID.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • auth (true, false)

    Whether we should advertise support for this application in the Auth-Application-ID AVP. Note that at least one of auth or acct must be specified.

  • acct (true, false)

    Whether we should advertise support for this application in the Acct-Application-ID AVP. Note that at least one of auth or acct must be specified.

  • vendor (Fixnum)

    If we should advertise support for this application in a Vendor-Specific-Application-Id AVP, this specifies the associated Vendor-Id.

Yields:

  • (req, cxn)

    Passes a Diameter message (and its originating connection) for application-specific handling.

Yield Parameters:

  • req (Message)

    The parsed Diameter message from the peer.

  • cxn (Socket)

    The TCP connection to the peer, to be passed to #send_answer.

Raises:

  • (ArgumentError)


85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/diameter/stack.rb', line 85

def add_handler(app_id, opts={}, &blk)
  vendor = opts.fetch(:vendor, 0)
  auth = opts.fetch(:auth, false)
  acct = opts.fetch(:acct, false)

  raise ArgumentError.new("Must specify at least one of auth or acct") unless auth or acct
  
  @acct_apps << [app_id, vendor] if acct
  @auth_apps << [app_id, vendor] if auth
  
  @handlers[app_id] = blk
end

#close(connection) ⇒ Object

Closes the given connection, blanking out any internal data structures associated with it.

Likely to be moved to the Peer object in a future release/

Parameters:

  • connection (Socket)

    The connection to close.



118
119
120
# File 'lib/diameter/stack.rb', line 118

def close(connection)
  @tcp_helper.close(connection)
end

#connect_to_peer(peer_uri, peer_host, realm) ⇒ Object

Creates a Peer connection to a Diameter agent at the specific network location indicated by peer_uri.

Parameters:

  • peer_uri (URI)

    The aaa:// URI identifying the peer. Should contain a hostname/IP; may contain a port (default 3868).

  • peer_host (String)

    The DiameterIdentity of this peer, which will uniquely identify it in the peer table.

  • realm (String)

    The Diameter realm of this peer.



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/diameter/stack.rb', line 132

def connect_to_peer(peer_uri, peer_host, realm)
  uri = URI(peer_uri)
  cxn = @tcp_helper.setup_new_connection(uri.host, uri.port)
  avps = [AVP.create('Origin-Host', @local_host),
          AVP.create('Origin-Realm', @local_realm),
          AVP.create('Host-IP-Address', IPAddr.new('127.0.0.1')),
          AVP.create('Vendor-Id', 100),
          AVP.create('Product-Name', 'ruby-diameter')
         ]
  avps += app_avps
  cer_bytes = Message.new(version: 1, command_code: 257, app_id: 0, request: true, proxyable: false, retransmitted: false, error: false, avps: avps).to_wire
  @tcp_helper.send(cer_bytes, cxn)
  @peer_table[peer_host] = Peer.new(peer_host)
  @peer_table[peer_host].state = :WAITING
  @peer_table[peer_host].cxn = cxn
  @peer_table[peer_host]
  # Will move to :UP when the CEA is received
end

#handle_message(msg_bytes, cxn) ⇒ Object

Handles a Diameter request straight from a network connection. Intended to be called by TCPStackHelper after it retrieves a message, not directly by users.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/diameter/stack.rb', line 216

def handle_message(msg_bytes, cxn)
  # Common processing - ensure that this message has come in on this
  # peer's expected connection, and update the last time we saw
  # activity on this peer
  msg = Message.from_bytes(msg_bytes)
  Diameter.logger.debug("Handling message #{msg}")
  peer = msg.avp_by_name('Origin-Host').octet_string
  if @peer_table[peer]
    @peer_table[peer].reset_timer
    unless @peer_table[peer].cxn == cxn
      Diameter.logger.log(Logger::WARN, "Ignoring message - claims to be from #{peer} but comes from #{cxn} not #{@peer_table[peer].cxn}")
    end
  end

  if msg.command_code == 257 && msg.answer
    handle_cea(msg)
  elsif msg.command_code == 257 && msg.request
    handle_cer(msg, cxn)
  elsif msg.command_code == 280 && msg.request
    handle_dwr(msg, cxn)
  elsif msg.command_code == 280 && msg.answer
    # No-op - we've already updated our timestamp
  elsif msg.answer
    handle_other_answer(msg, cxn)
  elsif @handlers.has_key? msg.app_id
    @handlers[msg.app_id].call(msg, cxn)
  else
    fail "Received unknown message of type #{msg.command_code}"
  end
end

#listen_for_tcp(port = 3868) ⇒ Object

Begins listening for inbound Diameter connections (making this a Diameter server instead of just a client).

Parameters:

  • port (Fixnum) (defaults to: 3868)

    The TCP port to listen on (default 3868)



57
58
59
# File 'lib/diameter/stack.rb', line 57

def listen_for_tcp(port=3868)
  @tcp_helper.setup_new_listen_connection("0.0.0.0", port)
end

#peer_state(id) ⇒ Keyword

Retrieves the current state of a peer, defaulting to :CLOSED if the peer does not exist.

Parameters:

  • id (String)

    The Diameter identity of the peer.

Returns:

  • (Keyword)

    The state of the peer (:UP, :WAITING or :CLOSED).



202
203
204
205
206
207
208
# File 'lib/diameter/stack.rb', line 202

def peer_state(id)
  if !@peer_table.key? id
    :CLOSED
  else
    @peer_table[id].state
  end
end

#send_answer(ans, original_cxn) ⇒ Object

Sends a Diameter answer. This is sent over the same connection the request was received on (which needs to be passed into to this method).

This adds this stack’s Origin-Host and Origin-Realm AVPs, if those AVPs don’t already exist.

Parameters:

  • ans (Message)

    The Diameter answer

  • original_cxn (Socket)

    The connection which the request came in on. This will have been passed to the block registered with #add_handler.



191
192
193
194
195
# File 'lib/diameter/stack.rb', line 191

def send_answer(ans, original_cxn)
  fail "Must pass an answer" unless ans.answer
  ans.add_origin_host_and_realm(@local_host, @local_realm) 
  @tcp_helper.send(ans.to_wire, original_cxn)
end

#send_request(req) ⇒ Object

Sends a Diameter request. This is routed to an appropriate peer based on the Destination-Host AVP.

This adds this stack’s Origin-Host and Origin-Realm AVPs, if those AVPs don’t already exist.

Parameters:

  • req (Message)

    The request to send.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/diameter/stack.rb', line 158

def send_request(req)
  fail "Must pass a request" unless req.request
  req.add_origin_host_and_realm(@local_host, @local_realm) 
  peer_name = req.avp_by_name('Destination-Host').octet_string
  state = peer_state(peer_name)
  if state == :UP
    peer = @peer_table[peer_name]
    @tcp_helper.send(req.to_wire, peer.cxn)
    q = Queue.new
    @pending_ete[req.ete] = q
    p = Concurrent::Promise.execute(executor: @threadpool) {
      Diameter.logger.debug("Waiting for answer to message with EtE #{req.ete}, queue #{q}")
      val = q.pop
      Diameter.logger.debug("Promise fulfilled for message with EtE #{req.ete}")
      val
    }
    return p
  else
    Diameter.logger.log(Logger::WARN, "Peer #{peer_name} is in state #{state} - cannot route")
  end
end

#shutdownObject

This shuts the stack down, closing all TCP connections and terminating any background threads still waiting for an answer.



102
103
104
105
106
107
108
109
110
# File 'lib/diameter/stack.rb', line 102

def shutdown
  @tcp_helper.shutdown
  @pending_ete.each do |ete, q|
    Diameter.logger.debug("Shutting down queue #{q} as no answer has been received with EtE #{ete}")
    q.push :shutdown
  end
  @threadpool.kill
  @threadpool.wait_for_termination(5)
end

#startObject

Complete the stack initialization and begin reading from the TCP connections.



49
50
51
# File 'lib/diameter/stack.rb', line 49

def start
  @tcp_helper.start_main_loop
end