Class: Polyphony::HTTP::Server::HTTP2StreamHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/polyphony/http/server/http2_stream.rb

Overview

Manages an HTTP 2 stream

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream, &block) ⇒ HTTP2StreamHandler

Returns a new instance of HTTP2StreamHandler.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/polyphony/http/server/http2_stream.rb', line 13

def initialize(stream, &block)
  @stream = stream
  @calling_fiber = Fiber.current
  @stream_fiber = Fiber.new { |req| handle_request(req, &block) }

  # Stream callbacks occur on the connection fiber (see HTTP2::Protocol#each).
  # The request handler is run on a separate fiber for each stream, allowing
  # concurrent handling of incoming requests on the same HTTP/2 connection.
  #
  # The different stream adapter APIs suspend the stream fiber, waiting for
  # stream callbacks to be called. The callbacks, in turn, transfer control to
  # the stream fiber, effectively causing the return of the adapter API calls.
  #
  # Note: the request handler is run once headers are received. Reading the
  # request body, if present, is at the discretion of the request handler.
  # This mirrors the behaviour of the HTTP/1 adapter.
  stream.on(:headers, &method(:on_headers))
  stream.on(:data, &method(:on_data))
  stream.on(:half_close, &method(:on_half_close))
end

Instance Attribute Details

#__next__Object

Returns the value of attribute __next__.



11
12
13
# File 'lib/polyphony/http/server/http2_stream.rb', line 11

def __next__
  @__next__
end

Instance Method Details

#consume_requestObject

Wait for request to finish



90
91
92
93
94
95
96
97
# File 'lib/polyphony/http/server/http2_stream.rb', line 90

def consume_request
  return if @request.complete?

  @waiting_for_half_close = true
  suspend
ensure
  @waiting_for_half_close = nil
end

#finishObject



120
121
122
123
124
125
126
127
# File 'lib/polyphony/http/server/http2_stream.rb', line 120

def finish
  if @headers_sent
    @stream.close
  else
    headers[':status'] ||= '204'
    @stream.headers(headers, end_stream: true)
  end
end

#get_body_chunkObject



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/polyphony/http/server/http2_stream.rb', line 77

def get_body_chunk
  # called in the context of the stream fiber
  return nil if @request.complete?

  @waiting_for_body_chunk = true
  # the chunk (or an exception) will be returned once the stream fiber is
  # resumed
  suspend
ensure
  @waiting_for_body_chunk = nil
end

#handle_request(request, &block) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/polyphony/http/server/http2_stream.rb', line 34

def handle_request(request, &block)
  error = nil
  block.(request)
  @calling_fiber.transfer
rescue Polyphony::MoveOn
  # ignore
rescue Exception => e
  error = e
ensure
  @done = true
  @calling_fiber.transfer error
end

#on_data(data) ⇒ Object



52
53
54
55
56
57
58
59
# File 'lib/polyphony/http/server/http2_stream.rb', line 52

def on_data(data)
  if @waiting_for_body_chunk
    @waiting_for_body_chunk = nil
    @stream_fiber.transfer(data)
  else
    @request.buffer_body_chunk(data)
  end
end

#on_half_closeObject



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/polyphony/http/server/http2_stream.rb', line 61

def on_half_close
  if @waiting_for_body_chunk
    @waiting_for_body_chunk = nil
    @stream_fiber.transfer(nil)
  elsif @waiting_for_half_close
    @waiting_for_half_close = nil
    @stream_fiber.transfer(nil)
  else
    @request.complete!
  end
end

#on_headers(headers) ⇒ Object



47
48
49
50
# File 'lib/polyphony/http/server/http2_stream.rb', line 47

def on_headers(headers)
  @request = Request.new(headers.to_h, self)
  @stream_fiber.transfer(@request)
end

#protocolObject



73
74
75
# File 'lib/polyphony/http/server/http2_stream.rb', line 73

def protocol
  'h2'
end

#respond(chunk, headers) ⇒ Object

response API



100
101
102
103
104
105
# File 'lib/polyphony/http/server/http2_stream.rb', line 100

def respond(chunk, headers)
  headers[':status'] ||= '200'
  @stream.headers(headers, end_stream: false)
  @stream.data(chunk, end_stream: true)
  @headers_sent = true
end

#send_chunk(chunk, done: false) ⇒ Object



115
116
117
118
# File 'lib/polyphony/http/server/http2_stream.rb', line 115

def send_chunk(chunk, done: false)
  send_headers({}, false) unless @headers_sent
  @stream.data(chunk, end_stream: done)
end

#send_headers(headers, empty_response = false) ⇒ Object



107
108
109
110
111
112
113
# File 'lib/polyphony/http/server/http2_stream.rb', line 107

def send_headers(headers, empty_response = false)
  return if @headers_sent

  headers[':status'] ||= (empty_response ? 204 : 200).to_s
  @stream.headers(headers, end_stream: false)
  @headers_sent = true
end

#stopObject



129
130
131
132
133
134
# File 'lib/polyphony/http/server/http2_stream.rb', line 129

def stop
  return if @done

  @stream.close
  @stream_fiber.schedule(Polyphony::MoveOn.new)
end