Class: Async::HTTP::Protocol::HTTP2::Stream
- Inherits:
-
Protocol::HTTP2::Stream
- Object
- Protocol::HTTP2::Stream
- Async::HTTP::Protocol::HTTP2::Stream
- Defined in:
- lib/async/http/protocol/http2/stream.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#pool ⇒ Object
Returns the value of attribute pool.
Instance Method Summary collapse
- #add_header(key, value, trailer: false) ⇒ Object
-
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called.
-
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
-
#initialize ⇒ Stream
constructor
A new instance of Stream.
-
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
- #process_data(frame) ⇒ Object
- #process_headers(frame) ⇒ Object
- #receive_trailing_headers(headers, end_stream) ⇒ Object
-
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
- #update_local_window(frame) ⇒ Object
- #wait_for_input ⇒ Object
- #window_updated(size) ⇒ Object
Constructor Details
#initialize ⇒ Stream
Returns a new instance of Stream.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/async/http/protocol/http2/stream.rb', line 18 def initialize(*) super @headers = nil @pool = nil # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil # Output buffer, writing request body or response body (window_updated): @output = nil end |
Instance Attribute Details
#headers ⇒ Object
Returns the value of attribute headers.
33 34 35 |
# File 'lib/async/http/protocol/http2/stream.rb', line 33 def headers @headers end |
#input ⇒ Object (readonly)
Returns the value of attribute input.
37 38 39 |
# File 'lib/async/http/protocol/http2/stream.rb', line 37 def input @input end |
#pool ⇒ Object
Returns the value of attribute pool.
35 36 37 |
# File 'lib/async/http/protocol/http2/stream.rb', line 35 def pool @pool end |
Instance Method Details
#add_header(key, value, trailer: false) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/async/http/protocol/http2/stream.rb', line 39 def add_header(key, value, trailer: false) if key == CONNECTION raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" elsif key.start_with? ":" raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" elsif key =~ /[A-Z]/ raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!" else @headers.add(key, value, trailer: trailer) end end |
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:
-
A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
-
A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task.
While the input stream is relatively straight forward, the output stream can trigger the second case above
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/async/http/protocol/http2/stream.rb', line 157 def closed(error) super if input = @input @input = nil input.close_write(error) end if output = @output @output = nil output.stop(error) end if pool = @pool and @connection pool.release(@connection) end return self end |
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/async/http/protocol/http2/stream.rb', line 126 def finish_output(error = nil) return if self.closed? trailer = @output&.trailer @output = nil if error send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) else # Write trailer? if trailer&.any? send_headers(trailer, ::Protocol::HTTP2::END_STREAM) else send_data(nil, ::Protocol::HTTP2::END_STREAM) end end end |
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
83 84 85 86 87 88 89 |
# File 'lib/async/http/protocol/http2/stream.rb', line 83 def prepare_input(length) if @input.nil? @input = Input.new(self, length) else raise ArgumentError, "Input body already prepared!" end end |
#process_data(frame) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/async/http/protocol/http2/stream.rb', line 98 def process_data(frame) data = frame.unpack if @input unless data.empty? @input.write(data) end if frame.end_stream? @input.close_write end end return data rescue ::Protocol::HTTP2::ProtocolError raise rescue # Anything else... send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end |
#process_headers(frame) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/async/http/protocol/http2/stream.rb', line 57 def process_headers(frame) if @headers and frame.end_stream? self.receive_trailing_headers(super, frame.end_stream?) else self.receive_initial_headers(super, frame.end_stream?) end if @input and frame.end_stream? @input.close_write end rescue ::Protocol::HTTP::InvalidTrailerError => error Console.warn(self, error) send_reset_stream(::Protocol::HTTP2::Error::PROTOCOL_ERROR) rescue ::Protocol::HTTP2::HeaderError => error Console.debug(self, "Error while processing headers!", error) send_reset_stream(error.code) end |
#receive_trailing_headers(headers, end_stream) ⇒ Object
51 52 53 54 55 |
# File 'lib/async/http/protocol/http2/stream.rb', line 51 def receive_trailing_headers(headers, end_stream) headers.each do |key, value| add_header(key, value, trailer: true) end end |
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
119 120 121 122 123 |
# File 'lib/async/http/protocol/http2/stream.rb', line 119 def send_body(body, trailer = nil) @output = Output.new(self, body, trailer) @output.start end |
#update_local_window(frame) ⇒ Object
91 92 93 94 95 96 |
# File 'lib/async/http/protocol/http2/stream.rb', line 91 def update_local_window(frame) consume_local_window(frame) # This is done on demand in `Input#read`: # request_window_update end |
#wait_for_input ⇒ Object
77 78 79 |
# File 'lib/async/http/protocol/http2/stream.rb', line 77 def wait_for_input return @input end |
#window_updated(size) ⇒ Object
145 146 147 148 149 150 151 |
# File 'lib/async/http/protocol/http2/stream.rb', line 145 def window_updated(size) super @output&.window_updated(size) return true end |