Class: EventMachine::RTMP::ResponseRouter
- Inherits:
-
ConnectionDelegate
- Object
- ConnectionDelegate
- EventMachine::RTMP::ResponseRouter
- Defined in:
- lib/em-rtmp/response_router.rb
Instance Attribute Summary collapse
-
#active_response ⇒ Object
Returns the value of attribute active_response.
Attributes inherited from ConnectionDelegate
Instance Method Summary collapse
-
#buffer_changed ⇒ Object
Called by the connection when the buffer changes and it’s appropriate to delegate to the response router.
-
#initialize(connection) ⇒ ResponseRouter
constructor
Create a new response router object to delegate to.
-
#receive_chunk(response) ⇒ Object
Receive a chunk of data for a given response.
-
#receive_header(header) ⇒ Object
Receive a fresh header, add it to the appropriate response and receive a chunk of data for that response.
-
#route_amf(version, response) ⇒ Object
Route an AMF response to it’s pending request.
-
#route_response(response) ⇒ Object
Route any response to its proper destination.
Methods inherited from ConnectionDelegate
#bytes_waiting, #change_state, #read, #write
Methods included from IOHelpers
#read_bitfield, #read_double_be, #read_int29, #read_safe, #read_uint16_be, #read_uint24_be, #read_uint32_be, #read_uint32_le, #read_uint8, #write_bitfield, #write_double_be, #write_int29, #write_uint16_be, #write_uint24_be, #write_uint32_be, #write_uint32_le, #write_uint8
Constructor Details
#initialize(connection) ⇒ ResponseRouter
Create a new response router object to delegate to. Start with a state of looking for a fresh header.
Returns nothing
11 12 13 14 |
# File 'lib/em-rtmp/response_router.rb', line 11 def initialize(connection) super connection @state = :wait_header end |
Instance Attribute Details
#active_response ⇒ Object
Returns the value of attribute active_response.
5 6 7 |
# File 'lib/em-rtmp/response_router.rb', line 5 def active_response @active_response end |
Instance Method Details
#buffer_changed ⇒ Object
Called by the connection when the buffer changes and it’s appropriate to delegate to the response router. Take action depending on our state.
Returns nothing
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/em-rtmp/response_router.rb', line 20 def buffer_changed case state when :wait_header header = Header.read_from_connection(@connection) Logger.debug "routing new header channel=#{header.channel_id}, type=#{header.} length=#{header.body_length}" receive_header header when :wait_chunk receive_chunk active_response end end |
#receive_chunk(response) ⇒ Object
Receive a chunk of data for a given response. Change our state depending on the result of the chunk read. If it was read in full, we’ll look for a header next time around. Otherwise, we will continue to read into that chunk until it is satisfied.
If the response is completely received, we’ll clone it and route that to the appropriate action, then reset that response so that it can receive something else in the future.
response - the Response object to act on
Returns nothing
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/em-rtmp/response_router.rb', line 55 def receive_chunk(response) response.read_next_chunk if response.waiting_in_chunk? self.active_response = response change_state :wait_chunk else self.active_response = nil change_state :wait_header end if response.complete? Logger.debug "response is complete, routing it!" route_response response.dup response.reset end end |
#receive_header(header) ⇒ Object
Receive a fresh header, add it to the appropriate response and receive a chunk of data for that response.
header - Header to receive and act on
Returns nothing
37 38 39 40 41 |
# File 'lib/em-rtmp/response_router.rb', line 37 def receive_header(header) response = Response.find_or_create(header.channel_id, @connection) response.add_header header receive_chunk response end |
#route_amf(version, response) ⇒ Object
Route an AMF response to it’s pending request
version - AMF version (:amf0 or :amf3) response - Response object
Returns nothing
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/em-rtmp/response_router.rb', line 114 def route_amf(version, response) Logger.debug "routing #{version} response for tid #{response..transaction_id}" if pending_request = PendingRequest.find(version, response..transaction_id, @connection) if response..success? pending_request.request.succeed(response) else pending_request.request.fail(response) end pending_request.delete else Logger.error "unable to find a matching transaction" end end |
#route_response(response) ⇒ Object
Route any response to its proper destination. AMF responses are routed to their pending request. Chunk size updates the connection, others are ignored for now.
response - Response object to route or act on.
Returns nothing.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/em-rtmp/response_router.rb', line 79 def route_response(response) case response.header. when :amf0 response. = Message.new version: 0 response..decode response.body Logger.info "head: #{response.header.inspect}" Logger.info "amf0: #{response..inspect}" route_amf :amf0, response when :amf3 response. = Message.new version: 3 response..decode response.body Logger.info "head: #{response.header.inspect}" Logger.info "amf3: #{response..inspect}" route_amf :amf3, response when :chunk_size connection.chunk_size = response.body.unpack('N')[0] Logger.info "setting chunk_size to #{chunk_size}" when :ack_size ack_size = response.body.unpack('N')[0] Logger.info "setting ack_size to #{ack_size}" when :bandwidth bandwidth = response.body[0..3].unpack('N')[0] bandwidth_type = response.body[4].unpack('c')[0] Logger.info "setting bandwidth to #{bandwidth} (#{bandwidth_type})" else Logger.info "cannot route unknown response: #{response.inspect}" end end |