Class: Tipi::HTTP2StreamHandler
- Inherits:
-
Object
- Object
- Tipi::HTTP2StreamHandler
- Defined in:
- lib/tipi/http2_stream.rb
Overview
Manages an HTTP 2 stream
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
Instance Method Summary collapse
- #complete?(request) ⇒ Boolean
- #finish(request) ⇒ Object
- #get_body(request) ⇒ Object
- #get_body_chunk(request, buffered_only = false) ⇒ Object
-
#initialize(adapter, stream, conn, first, &block) ⇒ HTTP2StreamHandler
constructor
A new instance of HTTP2StreamHandler.
- #on_data(data) ⇒ Object
- #on_half_close ⇒ Object
- #on_headers(headers) ⇒ Object
- #protocol ⇒ Object
-
#respond(request, body, headers) ⇒ Object
response API.
- #respond_from_io(request, io, headers, chunk_size = 2**16) ⇒ Object
- #run(&block) ⇒ Object
- #send_chunk(request, chunk, done: false) ⇒ Object
- #send_headers(request, headers, empty_response: false) ⇒ Object
- #stop ⇒ Object
- #transform_headers(headers) ⇒ Object
- #with_transfer_count(request) ⇒ Object
Constructor Details
#initialize(adapter, stream, conn, first, &block) ⇒ HTTP2StreamHandler
Returns a new instance of HTTP2StreamHandler.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/tipi/http2_stream.rb', line 11 def initialize(adapter, stream, conn, first, &block) @adapter = adapter @stream = stream @conn = conn @first = first @connection_fiber = Fiber.current @stream_fiber = spin { run(&block) } # Stream callbacks occur on the connection fiber (see HTTP2Adapter#each). # The request handler is run on a separate fiber for each stream, allowing # concurrent handling of incoming requests on the same HTTP/2 connection. # # The different stream adapter APIs suspend the stream fiber, waiting for # stream callbacks to be called. The callbacks, in turn, transfer control to # the stream fiber, effectively causing the return of the adapter API calls. # # Note: the request handler is run once headers are received. Reading the # request body, if present, is at the discretion of the request handler. # This mirrors the behaviour of the HTTP/1 adapter. stream.on(:headers, &method(:on_headers)) stream.on(:data, &method(:on_data)) stream.on(:half_close, &method(:on_half_close)) end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
9 10 11 |
# File 'lib/tipi/http2_stream.rb', line 9 def conn @conn end |
Instance Method Details
#complete?(request) ⇒ Boolean
111 112 113 |
# File 'lib/tipi/http2_stream.rb', line 111 def complete?(request) @complete end |
#finish(request) ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/tipi/http2_stream.rb', line 177 def finish(request) if @headers_sent @stream.close else headers[':status'] ||= Qeweney::Status::NO_CONTENT with_transfer_count(request) do @stream.headers(transform_headers(headers), end_stream: true) end end rescue HTTP2::Error::StreamClosed # ignore end |
#get_body(request) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/tipi/http2_stream.rb', line 96 def get_body(request) @buffered_chunks ||= [] return @buffered_chunks.join if @complete while !@complete begin @get_body_chunk_fiber = Fiber.current suspend ensure @get_body_chunk_fiber = nil end end @buffered_chunks.join end |
#get_body_chunk(request, buffered_only = false) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/tipi/http2_stream.rb', line 82 def get_body_chunk(request, buffered_only = false) @buffered_chunks ||= [] return @buffered_chunks.shift unless @buffered_chunks.empty? return nil if @complete begin @get_body_chunk_fiber = Fiber.current suspend ensure @get_body_chunk_fiber = nil end @buffered_chunks.shift end |
#on_data(data) ⇒ Object
59 60 61 62 63 64 |
# File 'lib/tipi/http2_stream.rb', line 59 def on_data(data) data = data.to_s # chunks might be wrapped in a HTTP2::Buffer (@buffered_chunks ||= []) << data @get_body_chunk_fiber&.schedule end |
#on_half_close ⇒ Object
66 67 68 69 |
# File 'lib/tipi/http2_stream.rb', line 66 def on_half_close @get_body_chunk_fiber&.schedule @complete = true end |
#on_headers(headers) ⇒ Object
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/tipi/http2_stream.rb', line 48 def on_headers(headers) @request = Qeweney::Request.new(headers.to_h, self) @request.rx_incr(@adapter.get_rx_count) @request.tx_incr(@adapter.get_tx_count) if @first @request.headers[':first'] = true @first = false end @stream_fiber << @request end |
#protocol ⇒ Object
71 72 73 |
# File 'lib/tipi/http2_stream.rb', line 71 def protocol 'h2' end |
#respond(request, body, headers) ⇒ Object
response API
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/tipi/http2_stream.rb', line 116 def respond(request, body, headers) headers = normalize_status_header(headers) with_transfer_count(request) do @stream.headers(transform_headers(headers)) @headers_sent = true @stream.data(body || '') end rescue HTTP2::Error::StreamClosed # ignore end |
#respond_from_io(request, io, headers, chunk_size = 2**16) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/tipi/http2_stream.rb', line 127 def respond_from_io(request, io, headers, chunk_size = 2**16) headers = normalize_status_header(headers) with_transfer_count(request) do @stream.headers(transform_headers(headers)) @headers_sent = true while (chunk = io.read(chunk_size)) @stream.data(chunk) end end rescue HTTP2::Error::StreamClosed # ignore end |
#run(&block) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/tipi/http2_stream.rb', line 35 def run(&block) request = receive error = nil block.(request) @connection_fiber.schedule rescue Polyphony::BaseException raise rescue Exception => e error = e ensure @connection_fiber.schedule error end |
#send_chunk(request, chunk, done: false) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/tipi/http2_stream.rb', line 163 def send_chunk(request, chunk, done: false) send_headers({}, false) unless @headers_sent if chunk with_transfer_count(request) do @stream.data(chunk, end_stream: done) end elsif done @stream.close end rescue HTTP2::Error::StreamClosed # ignore end |
#send_headers(request, headers, empty_response: false) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/tipi/http2_stream.rb', line 150 def send_headers(request, headers, empty_response: false) return if @headers_sent status = empty_response ? Qeweney::Status::NO_CONTENT : Qeweney::Status::OK headers = normalize_status_header(headers, status) with_transfer_count(request) do @stream.headers(transform_headers(headers), end_stream: false) end @headers_sent = true rescue HTTP2::Error::StreamClosed # ignore end |
#stop ⇒ Object
190 191 192 193 194 195 |
# File 'lib/tipi/http2_stream.rb', line 190 def stop return if @complete @stream.close @stream_fiber.schedule(Polyphony::MoveOn.new) end |
#transform_headers(headers) ⇒ Object
140 141 142 143 144 145 146 147 148 |
# File 'lib/tipi/http2_stream.rb', line 140 def transform_headers(headers) headers.each_with_object([]) do |(k, v), a| if v.is_a?(Array) v.each { |vv| a << [k, vv.to_s] } else a << [k, v.to_s] end end end |
#with_transfer_count(request) ⇒ Object
75 76 77 78 79 80 |
# File 'lib/tipi/http2_stream.rb', line 75 def with_transfer_count(request) @adapter.set_request_for_transfer_count(request) yield ensure @adapter.unset_request_for_transfer_count(request) end |