Class: Tipi::HTTP2StreamHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/tipi/http2_stream.rb

Overview

Manages an HTTP 2 stream

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter, stream, conn, first, &block) ⇒ HTTP2StreamHandler

Returns a new instance of HTTP2StreamHandler.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/tipi/http2_stream.rb', line 11

def initialize(adapter, stream, conn, first, &block)
  @adapter = adapter
  @stream = stream
  @conn = conn
  @first = first
  @connection_fiber = Fiber.current
  @stream_fiber = spin { run(&block) }

  # Stream callbacks occur on the connection fiber (see HTTP2Adapter#each).
  # The request handler is run on a separate fiber for each stream, allowing
  # concurrent handling of incoming requests on the same HTTP/2 connection.
  #
  # The different stream adapter APIs suspend the stream fiber, waiting for
  # stream callbacks to be called. The callbacks, in turn, transfer control to
  # the stream fiber, effectively causing the return of the adapter API calls.
  #
  # Note: the request handler is run once headers are received. Reading the
  # request body, if present, is at the discretion of the request handler.
  # This mirrors the behaviour of the HTTP/1 adapter.
  stream.on(:headers, &method(:on_headers))
  stream.on(:data, &method(:on_data))
  stream.on(:half_close, &method(:on_half_close))
end

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



9
10
11
# File 'lib/tipi/http2_stream.rb', line 9

def conn
  @conn
end

Instance Method Details

#complete?(request) ⇒ Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/tipi/http2_stream.rb', line 111

def complete?(request)
  @complete
end

#finish(request) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/tipi/http2_stream.rb', line 177

def finish(request)
  if @headers_sent
    @stream.close
  else
    headers[':status'] ||= Qeweney::Status::NO_CONTENT
    with_transfer_count(request) do
      @stream.headers(transform_headers(headers), end_stream: true)
    end
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end

#get_body(request) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/tipi/http2_stream.rb', line 96

def get_body(request)
  @buffered_chunks ||= []
  return @buffered_chunks.join if @complete

  while !@complete
    begin
      @get_body_chunk_fiber = Fiber.current
      suspend
    ensure
      @get_body_chunk_fiber = nil
    end
  end
  @buffered_chunks.join
end

#get_body_chunk(request, buffered_only = false) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/tipi/http2_stream.rb', line 82

def get_body_chunk(request, buffered_only = false)
  @buffered_chunks ||= []
  return @buffered_chunks.shift unless @buffered_chunks.empty?
  return nil if @complete

  begin
    @get_body_chunk_fiber = Fiber.current
    suspend
  ensure
    @get_body_chunk_fiber = nil
  end
  @buffered_chunks.shift
end

#on_data(data) ⇒ Object



59
60
61
62
63
64
# File 'lib/tipi/http2_stream.rb', line 59

def on_data(data)
  data = data.to_s # chunks might be wrapped in a HTTP2::Buffer

  (@buffered_chunks ||= []) << data
  @get_body_chunk_fiber&.schedule
end

#on_half_closeObject



66
67
68
69
# File 'lib/tipi/http2_stream.rb', line 66

def on_half_close
  @get_body_chunk_fiber&.schedule
  @complete = true
end

#on_headers(headers) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/tipi/http2_stream.rb', line 48

def on_headers(headers)
  @request = Qeweney::Request.new(headers.to_h, self)
  @request.rx_incr(@adapter.get_rx_count)
  @request.tx_incr(@adapter.get_tx_count)
  if @first
    @request.headers[':first'] = true
    @first = false
  end
  @stream_fiber << @request
end

#protocolObject



71
72
73
# File 'lib/tipi/http2_stream.rb', line 71

def protocol
  'h2'
end

#respond(request, body, headers) ⇒ Object

response API



116
117
118
119
120
121
122
123
124
125
# File 'lib/tipi/http2_stream.rb', line 116

def respond(request, body, headers)
  headers = normalize_status_header(headers)
  with_transfer_count(request) do
    @stream.headers(transform_headers(headers))
    @headers_sent = true
    @stream.data(body || '')
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end

#respond_from_io(request, io, headers, chunk_size = 2**16) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/tipi/http2_stream.rb', line 127

def respond_from_io(request, io, headers, chunk_size = 2**16)
  headers = normalize_status_header(headers)
  with_transfer_count(request) do
    @stream.headers(transform_headers(headers))
    @headers_sent = true
    while (chunk = io.read(chunk_size))
      @stream.data(chunk)
    end
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end

#run(&block) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/tipi/http2_stream.rb', line 35

def run(&block)
  request = receive
  error = nil
  block.(request)
  @connection_fiber.schedule
rescue Polyphony::BaseException
  raise
rescue Exception => e
  error = e
ensure
  @connection_fiber.schedule error
end

#send_chunk(request, chunk, done: false) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/tipi/http2_stream.rb', line 163

def send_chunk(request, chunk, done: false)
  send_headers({}, false) unless @headers_sent

  if chunk
    with_transfer_count(request) do
      @stream.data(chunk, end_stream: done)
    end
  elsif done
    @stream.close
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end

#send_headers(request, headers, empty_response: false) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/tipi/http2_stream.rb', line 150

def send_headers(request, headers, empty_response: false)
  return if @headers_sent

  status = empty_response ? Qeweney::Status::NO_CONTENT : Qeweney::Status::OK
  headers = normalize_status_header(headers, status)
  with_transfer_count(request) do
    @stream.headers(transform_headers(headers), end_stream: false)
  end
  @headers_sent = true
rescue HTTP2::Error::StreamClosed
  # ignore
end

#stopObject



190
191
192
193
194
195
# File 'lib/tipi/http2_stream.rb', line 190

def stop
  return if @complete

  @stream.close
  @stream_fiber.schedule(Polyphony::MoveOn.new)
end

#transform_headers(headers) ⇒ Object



140
141
142
143
144
145
146
147
148
# File 'lib/tipi/http2_stream.rb', line 140

def transform_headers(headers)
  headers.each_with_object([]) do |(k, v), a|
    if v.is_a?(Array)
      v.each { |vv| a << [k, vv.to_s] }
    else
      a << [k, v.to_s]
    end
  end
end

#with_transfer_count(request) ⇒ Object



75
76
77
78
79
80
# File 'lib/tipi/http2_stream.rb', line 75

def with_transfer_count(request)
  @adapter.set_request_for_transfer_count(request)
  yield
ensure
  @adapter.unset_request_for_transfer_count(request)
end