Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-----------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) ⇒ Stream

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.

Parameters:

  • id (Integer)
  • weight (Integer) (defaults to: 16)
  • dependency (Integer) (defaults to: 0)
  • exclusive (Boolean) (defaults to: false)
  • window (Integer)
  • parent (Stream) (defaults to: nil)
  • state (Symbol) (defaults to: :idle)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/http/2/stream.rb', line 75

def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle)
  @connection = connection
  @id = id
  @weight = weight
  @dependency = dependency
  process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive)
  @local_window_max_size = connection.local_settings[:settings_initial_window_size]
  @local_window  = connection.local_settings[:settings_initial_window_size]
  @remote_window = connection.remote_settings[:settings_initial_window_size]
  @parent = parent
  @state  = state
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @remote_window = v }
  on(:local_window) { |v| @local_window_max_size = @local_window = v }
end

Instance Attribute Details

#closedObject (readonly)

Reason why connection was closed.



60
61
62
# File 'lib/http/2/stream.rb', line 60

def closed
  @closed
end

#dependencyObject (readonly)

Returns the value of attribute dependency.



52
53
54
# File 'lib/http/2/stream.rb', line 52

def dependency
  @dependency
end

#idObject (readonly)

Stream ID (odd for client initiated streams, even otherwise).



42
43
44
# File 'lib/http/2/stream.rb', line 42

def id
  @id
end

#local_windowObject (readonly) Also known as: window

Size of current stream flow control window.



55
56
57
# File 'lib/http/2/stream.rb', line 55

def local_window
  @local_window
end

#parentObject (readonly)

Request parent stream of push stream.



48
49
50
# File 'lib/http/2/stream.rb', line 48

def parent
  @parent
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window.



56
57
58
# File 'lib/http/2/stream.rb', line 56

def remote_window
  @remote_window
end

#stateObject (readonly)

Stream state as defined by HTTP 2.0.



45
46
47
# File 'lib/http/2/stream.rb', line 45

def state
  @state
end

#weightObject (readonly)

Stream priority as set by initiator.



51
52
53
# File 'lib/http/2/stream.rb', line 51

def weight
  @weight
end

Instance Method Details

#cancelObject

Sends a RST_STREAM indicating that the stream is no longer needed.



230
231
232
# File 'lib/http/2/stream.rb', line 230

def cancel
  send(type: :rst_stream, error: :cancel)
end

#chunk_data(payload, max_size) ⇒ Object

Chunk data into max_size, yield each chunk, then return final chunk



211
212
213
214
215
216
217
218
219
# File 'lib/http/2/stream.rb', line 211

def chunk_data(payload, max_size)
  total = payload.bytesize
  cursor = 0
  while (total - cursor) > max_size
    yield payload.byteslice(cursor, max_size)
    cursor += max_size
  end
  payload.byteslice(cursor, total - cursor)
end

#close(error = :stream_closed) ⇒ Object

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.

Parameters:

  • error (:Symbol) (defaults to: :stream_closed)

    optional reason why stream was closed



225
226
227
# File 'lib/http/2/stream.rb', line 225

def close(error = :stream_closed)
  send(type: :rst_stream, error: error)
end

#closed?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/http/2/stream.rb', line 94

def closed?
  @state == :closed
end

#data(payload, end_stream: true) ⇒ Object

Sends DATA frame containing response payload.

Parameters:

  • payload (String)
  • end_stream (Boolean) (defaults to: true)

    indicates last response DATA frame



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/http/2/stream.rb', line 193

def data(payload, end_stream: true)
  # Split data according to each frame is smaller enough
  # TODO: consider padding?
  max_size = @connection.remote_settings[:settings_max_frame_size]

  if payload.bytesize > max_size
    payload = chunk_data(payload, max_size) do |chunk|
      send(type: :data, flags: [], payload: chunk)
    end
  end

  flags = []
  flags << :end_stream if end_stream
  send(type: :data, flags: flags, payload: payload)
end

#headers(headers, end_headers: true, end_stream: false) ⇒ Object

Sends a HEADERS frame containing HTTP response headers. All pseudo-header fields MUST appear in the header block before regular header fields.

Parameters:

  • headers (Array or Hash)

    Array of key-value pairs or Hash

  • end_headers (Boolean) (defaults to: true)

    indicates that no more headers will be sent

  • end_stream (Boolean) (defaults to: false)

    indicates that no payload will be sent



164
165
166
167
168
169
170
# File 'lib/http/2/stream.rb', line 164

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send(type: :headers, flags: flags, payload: headers)
end

#promise(headers, end_headers: true, &block) ⇒ Object



172
173
174
175
176
177
# File 'lib/http/2/stream.rb', line 172

def promise(headers, end_headers: true, &block)
  fail ArgumentError, 'must provide callback' unless block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

#receive(frame) ⇒ Object Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.

Parameters:

  • frame (Hash)


101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/http/2/stream.rb', line 101

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    update_local_window(frame)
    # Emit DATA frame
    emit(:data, frame[:payload]) unless frame[:ignore]
    calculate_window_update(@local_window_max_size)
  when :headers
    emit(:headers, frame[:payload]) unless frame[:ignore]
  when :push_promise
    emit(:promise_headers, frame[:payload]) unless frame[:ignore]
  when :priority
    process_priority(frame)
  when :window_update
    process_window_update(frame)
  when :altsvc
    # 4.  The ALTSVC HTTP/2 Frame
    # An ALTSVC frame on a
    # stream other than stream 0 containing non-empty "Origin" information
    # is invalid and MUST be ignored.
    if !frame[:origin] || frame[:origin].empty?
      emit(frame[:type], frame)
    end
  when :blocked
    emit(frame[:type], frame)
  end

  complete_transition(frame)
end

#refuseObject

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



236
237
238
# File 'lib/http/2/stream.rb', line 236

def refuse
  send(type: :rst_stream, error: :refused_stream)
end

#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).

Parameters:

  • weight (Integer) (defaults to: 16)

    new stream weight value

  • dependency (Integer) (defaults to: 0)

    new stream dependency stream



184
185
186
187
# File 'lib/http/2/stream.rb', line 184

def reprioritize(weight: 16, dependency: 0, exclusive: false)
  stream_error if @id.even?
  send(type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive)
end

#send(frame) ⇒ Object

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.

Parameters:

  • frame (Hash)


139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/http/2/stream.rb', line 139

def send(frame)
  process_priority(frame) if frame[:type] == :priority

  case frame[:type]
  when :data
    # @remote_window is maintained in send_data
    send_data(frame)
  when :window_update
    manage_state(frame) do
      @local_window += frame[:increment]
      emit(:frame, frame)
    end
  else
    manage_state(frame) do
      emit(:frame, frame)
    end
  end
end

#window_update(increment) ⇒ Object

Sends a WINDOW_UPDATE frame to the peer.

Parameters:

  • increment (Integer)


243
244
245
246
247
# File 'lib/http/2/stream.rb', line 243

def window_update(increment)
  # emit stream-level WINDOW_UPDATE unless stream is closed
  return if @state == :closed || @state == :remote_closed
  send(type: :window_update, increment: increment)
end