Class: EventMachine::RTMP::ResponseRouter

Inherits:
ConnectionDelegate show all
Defined in:
lib/em-rtmp/response_router.rb

Instance Attribute Summary collapse

Attributes inherited from ConnectionDelegate

#state

Instance Method Summary collapse

Methods inherited from ConnectionDelegate

#bytes_waiting, #change_state, #read, #write

Methods included from IOHelpers

#read_bitfield, #read_double_be, #read_int29, #read_safe, #read_uint16_be, #read_uint24_be, #read_uint32_be, #read_uint32_le, #read_uint8, #write_bitfield, #write_double_be, #write_int29, #write_uint16_be, #write_uint24_be, #write_uint32_be, #write_uint32_le, #write_uint8

Constructor Details

#initialize(connection) ⇒ ResponseRouter

Create a new response router object to delegate to. Start with a state of looking for a fresh header.

Returns nothing



11
12
13
14
# File 'lib/em-rtmp/response_router.rb', line 11

def initialize(connection)
  super connection
  @state = :wait_header
end

Instance Attribute Details

#active_responseObject

Returns the value of attribute active_response.



5
6
7
# File 'lib/em-rtmp/response_router.rb', line 5

def active_response
  @active_response
end

Instance Method Details

#buffer_changedObject

Called by the connection when the buffer changes and it’s appropriate to delegate to the response router. Take action depending on our state.

Returns nothing



20
21
22
23
24
25
26
27
28
29
# File 'lib/em-rtmp/response_router.rb', line 20

def buffer_changed
  case state
  when :wait_header
    header = Header.read_from_connection(@connection)
    Logger.debug "routing new header channel=#{header.channel_id}, type=#{header.message_type_id} length=#{header.body_length}"
    receive_header header
  when :wait_chunk
    receive_chunk active_response
  end
end

#receive_chunk(response) ⇒ Object

Receive a chunk of data for a given response. Change our state depending on the result of the chunk read. If it was read in full, we’ll look for a header next time around. Otherwise, we will continue to read into that chunk until it is satisfied.

If the response is completely received, we’ll clone it and route that to the appropriate action, then reset that response so that it can receive something else in the future.

response - the Response object to act on

Returns nothing



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/em-rtmp/response_router.rb', line 55

def receive_chunk(response)
  response.read_next_chunk

  if response.waiting_in_chunk?
    self.active_response = response
    change_state :wait_chunk
  else
    self.active_response = nil
    change_state :wait_header
  end

  if response.complete?
    Logger.debug "response is complete, routing it!"
    route_response response.dup
    response.reset
  end
end

#receive_header(header) ⇒ Object

Receive a fresh header, add it to the appropriate response and receive a chunk of data for that response.

header - Header to receive and act on

Returns nothing



37
38
39
40
41
# File 'lib/em-rtmp/response_router.rb', line 37

def receive_header(header)
  response = Response.find_or_create(header.channel_id, @connection)
  response.add_header header
  receive_chunk response
end

#route_amf(version, response) ⇒ Object

Route an AMF response to it’s pending request

version - AMF version (:amf0 or :amf3) response - Response object

Returns nothing



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/em-rtmp/response_router.rb', line 114

def route_amf(version, response)
  Logger.debug "routing #{version} response for tid #{response.message.transaction_id}"
  if pending_request = PendingRequest.find(version, response.message.transaction_id, @connection)
    if response.message.success?
      pending_request.request.succeed(response)
    else
      pending_request.request.fail(response)
    end
    pending_request.delete
  else
    Logger.error "unable to find a matching transaction"
  end
end

#route_response(response) ⇒ Object

Route any response to its proper destination. AMF responses are routed to their pending request. Chunk size updates the connection, others are ignored for now.

response - Response object to route or act on.

Returns nothing.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/em-rtmp/response_router.rb', line 79

def route_response(response)
  case response.header.message_type
  when :amf0
    response.message = Message.new version: 0
    response.message.decode response.body
    Logger.info "head: #{response.header.inspect}"
    Logger.info "amf0: #{response.message.inspect}"
    route_amf :amf0, response
  when :amf3
    response.message = Message.new version: 3
    response.message.decode response.body
    Logger.info "head: #{response.header.inspect}"
    Logger.info "amf3: #{response.message.inspect}"
    route_amf :amf3, response
  when :chunk_size
    connection.chunk_size = response.body.unpack('N')[0]
    Logger.info "setting chunk_size to #{chunk_size}"
  when :ack_size
    ack_size = response.body.unpack('N')[0]
    Logger.info "setting ack_size to #{ack_size}"
  when :bandwidth
    bandwidth = response.body[0..3].unpack('N')[0]
    bandwidth_type = response.body[4].unpack('c')[0]
    Logger.info "setting bandwidth to #{bandwidth} (#{bandwidth_type})"
  else
    Logger.info "cannot route unknown response: #{response.inspect}"
  end
end