Class: HTTP2::Connection

Inherits:
Object
  • Object
show all
Includes:
BufferUtils, Emitter, Error, FlowBuffer
Defined in:
lib/http/2/connection.rb

Overview

Connection encapsulates all of the connection, stream, flow-control, error management, and other processing logic required for a well-behaved HTTP 2.0 endpoint.

Note that this class should not be used directly. Instead, you want to use either Client or Server class to drive the HTTP 2.0 exchange.

rubocop:disable Metrics/ClassLength

Direct Known Subclasses

Client, Server

Constant Summary

Constants included from FlowBuffer

FlowBuffer::MAX_WINDOW_SIZE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BufferUtils

#read_str, #read_uint32, #shift_byte

Methods included from Emitter

#emit, #on, #once

Methods included from FlowBuffer

#buffered_amount, #flush

Constructor Details

#initialize(settings = {}) ⇒ Connection

Initializes new connection object.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/http/2/connection.rb', line 79

def initialize(settings = {})
  @local_settings  = DEFAULT_CONNECTION_SETTINGS.merge(settings)
  @remote_settings = SPEC_DEFAULT_CONNECTION_SETTINGS.dup

  @compressor   = Header::Compressor.new(settings)
  @decompressor = Header::Decompressor.new(settings)

  @active_stream_count = 0
  @last_activated_stream = 0
  @last_stream_id = 0
  @streams = {}
  @streams_recently_closed = {}
  @pending_settings = []

  @framer = Framer.new(@local_settings[:settings_max_frame_size])

  @local_window_limit = @local_settings[:settings_initial_window_size]
  @local_window = @local_window_limit
  @remote_window_limit = @remote_settings[:settings_initial_window_size]
  @remote_window = @remote_window_limit

  @recv_buffer = "".b
  @continuation = []
  @error = nil

  @h2c_upgrade = nil
  @closed_since = nil
  @received_frame = false
end

Instance Attribute Details

#active_stream_countObject

Number of active streams between client and server (reserved streams are not counted towards the stream limit).



75
76
77
# File 'lib/http/2/connection.rb', line 75

def active_stream_count
  @active_stream_count
end

#local_settingsObject (readonly)

Current settings value for local and peer



67
68
69
# File 'lib/http/2/connection.rb', line 67

def local_settings
  @local_settings
end

#local_windowObject (readonly) Also known as: window

Size of current connection flow control window (by default, set to infinity, but is automatically updated on receipt of peer settings).



62
63
64
# File 'lib/http/2/connection.rb', line 62

def local_window
  @local_window
end

#pending_settingsObject (readonly)

Pending settings value

Sent but not ack'ed settings


71
72
73
# File 'lib/http/2/connection.rb', line 71

def pending_settings
  @pending_settings
end

#remote_settingsObject (readonly)

Returns the value of attribute remote_settings.



63
64
65
# File 'lib/http/2/connection.rb', line 63

def remote_settings
  @remote_settings
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window.



63
64
65
# File 'lib/http/2/connection.rb', line 63

def remote_window
  @remote_window
end

#stateObject (readonly)

Connection state (:new, :closed).



58
59
60
# File 'lib/http/2/connection.rb', line 58

def state
  @state
end

Instance Method Details

#<<(data) ⇒ Object



415
416
417
# File 'lib/http/2/connection.rb', line 415

def <<(data)
  receive(data)
end

#closed?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/http/2/connection.rb', line 109

def closed?
  @state == :closed
end

#goaway(error = :no_error, payload = nil) ⇒ Object

Sends a GOAWAY frame indicating that the peer should stop creating new streams for current connection.

Endpoints MAY append opaque data to the payload of any GOAWAY frame. Additional debug data is intended for diagnostic purposes only and carries no semantic value. Debug data MUST NOT be persistently stored, since it could contain sensitive information.

Parameters:

  • error (Symbol) (defaults to: :no_error)
  • payload (String) (defaults to: nil)


151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/http/2/connection.rb', line 151

def goaway(error = :no_error, payload = nil)
  last_stream = if (max = @streams.max)
                  max.first
                else
                  0
                end

  send(type: :goaway, last_stream: last_stream,
       error: error, payload: payload)
  @state = :closed
  @closed_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#new_stream(**args) ⇒ Object

Allocates new stream for current connection.

Parameters:

  • priority (Integer)
  • window (Integer)
  • parent (Stream)

Raises:



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/http/2/connection.rb', line 118

def new_stream(**args)
  raise ConnectionClosed if @state == :closed
  raise StreamLimitExceeded if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams]

  connection_error(:protocol_error, msg: "id is smaller than previous") if @stream_id < @last_activated_stream

  stream = activate_stream(id: @stream_id, **args)
  @last_activated_stream = stream.id

  @stream_id += 2

  stream
end

#ping(payload, &blk) ⇒ Object

Sends PING frame to the peer.

Parameters:

  • payload (String)

    optional payload must be 8 bytes long

  • blk (Proc)

    callback to execute when PONG is received



136
137
138
139
# File 'lib/http/2/connection.rb', line 136

def ping(payload, &blk)
  send(type: :ping, stream: 0, payload: payload)
  once(:ack, &blk) if blk
end

#receive(data) ⇒ Object

Decodes incoming bytes into HTTP 2.0 frames and routes them to appropriate receivers: connection frames are handled directly, and stream frames are passed to appropriate stream objects.

Parameters:

  • data (String)

    Binary encoded string



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/http/2/connection.rb', line 189

def receive(data)
  @recv_buffer << data

  # Upon establishment of a TCP connection and determination that
  # HTTP/2.0 will be used by both peers, each endpoint MUST send a
  # connection header as a final confirmation and to establish the
  # initial settings for the HTTP/2.0 connection.
  #
  # Client connection header is 24 byte connection header followed by
  # SETTINGS frame. Server connection header is SETTINGS frame only.
  if @state == :waiting_magic
    if @recv_buffer.size < 24
      raise HandshakeError unless CONNECTION_PREFACE_MAGIC.start_with? @recv_buffer

      return # maybe next time
    elsif read_str(@recv_buffer, 24) == CONNECTION_PREFACE_MAGIC
      # MAGIC is OK.  Send our settings
      @state = :waiting_connection_preface
      payload = @local_settings.reject { |k, v| v == SPEC_DEFAULT_CONNECTION_SETTINGS[k] }
      settings(payload)
    else
      raise HandshakeError
    end
  end

  while (frame = @framer.parse(@recv_buffer))
    if is_a?(Client) && !@received_frame
      connection_error(:protocol_error, msg: "didn't receive settings") if frame[:type] != :settings
      @received_frame = true
    end

    # Implementations MUST discard frames
    # that have unknown or unsupported types.
    if frame[:type].nil?
      # However, extension frames that appear in
      # the middle of a header block (Section 4.3) are not permitted; these
      # MUST be treated as a connection error (Section 5.4.1) of type
      # PROTOCOL_ERROR.
      connection_error(:protocol_error) unless @continuation.empty?
      next
    end

    emit(:frame_received, frame)

    # Header blocks MUST be transmitted as a contiguous sequence of frames
    # with no interleaved frames of any other type, or from any other stream.
    unless @continuation.empty?
      connection_error unless frame[:type] == :continuation && frame[:stream] == @continuation.first[:stream]

      @continuation << frame
      unless frame[:flags].include? :end_headers
        buffered_payload = @continuation.sum { |f| f[:payload].bytesize }
        # prevent HTTP/2 CONTINUATION FLOOD
        # same heuristic as the one from HAProxy: https://www.haproxy.com/blog/haproxy-is-resilient-to-the-http-2-continuation-flood
        # different mitigation (connection closed, instead of 400 response)
        unless buffered_payload < @local_settings[:settings_max_frame_size]
          connection_error(:protocol_error,
                           msg: "too many continuations received")
        end

        next
      end

      payload = @continuation.map { |f| f[:payload] }.join

      frame = @continuation.shift
      @continuation.clear

      frame.delete(:length)
      frame[:payload] = payload
      frame[:flags] << :end_headers
    end

    # SETTINGS frames always apply to a connection, never a single stream.
    # The stream identifier for a settings frame MUST be zero.  If an
    # endpoint receives a SETTINGS frame whose stream identifier field is
    # anything other than 0x0, the endpoint MUST respond with a connection
    # error (Section 5.4.1) of type PROTOCOL_ERROR.
    if connection_frame?(frame)
      connection_error(:protocol_error) unless frame[:stream].zero?
      connection_management(frame)
    else
      case frame[:type]
      when :headers
        # When server receives even-numbered stream identifier,
        # the endpoint MUST respond with a connection error of type PROTOCOL_ERROR.
        connection_error if frame[:stream].even? && is_a?(Server)

        # The last frame in a sequence of HEADERS/CONTINUATION
        # frames MUST have the END_HEADERS flag set.
        unless frame[:flags].include? :end_headers
          @continuation << frame
          next
        end

        # After sending a GOAWAY frame, the sender can discard frames
        # for new streams.  However, any frames that alter connection
        # state cannot be completely ignored.  For instance, HEADERS,
        # PUSH_PROMISE and CONTINUATION frames MUST be minimally
        # processed to ensure a consistent compression state
        decode_headers(frame)
        return if @state == :closed

        stream = @streams[frame[:stream]]
        if stream.nil?
          verify_pseudo_headers(frame)

          verify_stream_order(frame[:stream])
          stream = activate_stream(
            id: frame[:stream],
            weight: frame[:weight] || DEFAULT_WEIGHT,
            dependency: frame[:dependency] || 0,
            exclusive: frame[:exclusive] || false
          )
          emit(:stream, stream)
        end

        stream << frame

      when :push_promise
        # The last frame in a sequence of PUSH_PROMISE/CONTINUATION
        # frames MUST have the END_HEADERS flag set
        unless frame[:flags].include? :end_headers
          @continuation << frame
          return
        end

        decode_headers(frame)
        return if @state == :closed

        # PUSH_PROMISE frames MUST be associated with an existing, peer-
        # initiated stream... A receiver MUST treat the receipt of a
        # PUSH_PROMISE on a stream that is neither "open" nor
        # "half-closed (local)" as a connection error (Section 5.4.1) of
        # type PROTOCOL_ERROR. Similarly, a receiver MUST treat the
        # receipt of a PUSH_PROMISE that promises an illegal stream
        # identifier (Section 5.1.1) (that is, an identifier for a stream
        # that is not currently in the "idle" state) as a connection error
        # (Section 5.4.1) of type PROTOCOL_ERROR, unless the receiver
        # recently sent a RST_STREAM frame to cancel the associated stream.
        parent = @streams[frame[:stream]]
        pid = frame[:promise_stream]

        # if PUSH parent is recently closed, RST_STREAM the push
        if @streams_recently_closed[frame[:stream]]
          send(type: :rst_stream, stream: pid, error: :refused_stream)
          return
        end

        connection_error(msg: "missing parent ID") if parent.nil?

        unless parent.state == :open || parent.state == :half_closed_local
          # An endpoint might receive a PUSH_PROMISE frame after it sends
          # RST_STREAM.  PUSH_PROMISE causes a stream to become "reserved".
          # The RST_STREAM does not cancel any promised stream.  Therefore, if
          # promised streams are not desired, a RST_STREAM can be used to
          # close any of those streams.
          if parent.closed == :local_rst
            # We can either (a) 'resurrect' the parent, or (b) RST_STREAM
            # ... sticking with (b), might need to revisit later.
            send(type: :rst_stream, stream: pid, error: :refused_stream)
          else
            connection_error
          end
        end

        _verify_pseudo_headers(frame, REQUEST_MANDATORY_HEADERS)
        verify_stream_order(pid)
        stream = activate_stream(id: pid, parent: parent)
        emit(:promise, stream)
        stream << frame
      else
        if (stream = @streams[frame[:stream]])
          stream << frame
          if frame[:type] == :data
            update_local_window(frame)
            calculate_window_update(@local_window_limit)
          end
        else
          case frame[:type]
          # The PRIORITY frame can be sent for a stream in the "idle" or
          # "closed" state. This allows for the reprioritization of a
          # group of dependent streams by altering the priority of an
          # unused or closed parent stream.
          when :priority
            stream = activate_stream(
              id: frame[:stream],
              weight: frame[:weight] || DEFAULT_WEIGHT,
              dependency: frame[:dependency] || 0,
              exclusive: frame[:exclusive] || false
            )

            emit(:stream, stream)
            stream << frame

          # WINDOW_UPDATE can be sent by a peer that has sent a frame
          # bearing the END_STREAM flag. This means that a receiver could
          # receive a WINDOW_UPDATE frame on a "half-closed (remote)" or
          # "closed" stream. A receiver MUST NOT treat this as an error
          # (see Section 5.1).
          when :window_update
            stream = @streams_recently_closed[frame[:stream]]
            connection_error(:protocol_error, msg: "sent window update on idle stream") unless stream
            process_window_update(frame: frame, encode: true)
          # Endpoints MUST ignore
          # WINDOW_UPDATE or RST_STREAM frames received in this state (closed), though
          # endpoints MAY choose to treat frames that arrive a significant
          # time after sending END_STREAM as a connection error.
          when :rst_stream
            stream = @streams_recently_closed[frame[:stream]]
            connection_error(:protocol_error, msg: "sent window update on idle stream") unless stream
          else
            # An endpoint that receives an unexpected stream identifier
            # MUST respond with a connection error of type PROTOCOL_ERROR.
            connection_error(msg: "stream does not exist")
          end
        end
      end
    end
  end
rescue StandardError => e
  raise if e.is_a?(Error::Error)

  connection_error(e: e)
end

#settings(payload) ⇒ Object

Sends a connection SETTINGS frame to the peer. The values are reflected when the corresponding ACK is received.

Parameters:

  • settings (Array or Hash)


176
177
178
179
180
181
182
# File 'lib/http/2/connection.rb', line 176

def settings(payload)
  payload = payload.to_a
  validate_settings(@local_role, payload)
  @pending_settings << payload
  send(type: :settings, stream: 0, payload: payload)
  @pending_settings << payload
end

#window_update(increment) ⇒ Object

Sends a WINDOW_UPDATE frame to the peer.

Parameters:

  • increment (Integer)


167
168
169
170
# File 'lib/http/2/connection.rb', line 167

def window_update(increment)
  @local_window += increment
  send(type: :window_update, stream: 0, increment: increment)
end