Class: AMQP::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/connection.rb,
lib/amqp/client/channel.rb

Overview

Represents a single established AMQP connection

Defined Under Namespace

Classes: Channel

Instance Attribute Summary collapse

Callbacks collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", read_loop_thread: true, **options) ⇒ Connection

Establish a connection to an AMQP broker

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • read_loop_thread (Boolean) (defaults to: true)

    If true run #read_loop in a background thread, otherwise the user have to run it explicitly, without #read_loop the connection won’t function

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • connect_timeout (Float) — default: 30

    TCP connection timeout

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maxium number of channels the client will be allowed to have open. Maxium allowed is 65_536. The smallest of the client’s and the broker’s value will be used.

  • keepalive (String) — default: 60:10:3

    TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/amqp/client/connection.rb', line 29

def initialize(uri = "", read_loop_thread: true, **options)
  uri = URI.parse(uri)
  tls = uri.scheme == "amqps"
  port = port_from_env || uri.port || (tls ? 5671 : 5672)
  host = uri.host || "localhost"
  user = uri.user || "guest"
  password = uri.password || "guest"
  vhost = URI.decode_www_form_component(uri.path[1..] || "/")
  options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options)

  socket = open_socket(host, port, tls, options)
  channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, options)

  @socket = socket
  @channel_max = channel_max.zero? ? 65_536 : channel_max
  @frame_max = frame_max
  @heartbeat = heartbeat
  @channels = {}
  @closed = nil
  @replies = ::Queue.new
  @write_lock = Mutex.new
  @blocked = nil
  @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" }
  @on_unblocked = -> { warn "AMQP-Client unblocked by broker" }

  Thread.new { read_loop } if read_loop_thread
end

Instance Attribute Details

#frame_maxInteger (readonly)

The max frame size negotiated between the client and the broker

Returns:

  • (Integer)


76
77
78
# File 'lib/amqp/client/connection.rb', line 76

def frame_max
  @frame_max
end

Class Method Details

.connect(uri, read_loop_thread: true, **options) ⇒ Object

Deprecated.

Alias for #initialize

See Also:



70
71
72
# File 'lib/amqp/client/connection.rb', line 70

def self.connect(uri, read_loop_thread: true, **options)
  new(uri, read_loop_thread: read_loop_thread, **options)
end

Instance Method Details

#blocked?Bool

Indicates that the server is blocking publishes. If the client keeps publishing the server will stop reading from the socket. Use the #on_blocked callback to get notified when the server is resource constrained.

Returns:

  • (Bool)

See Also:



63
64
65
# File 'lib/amqp/client/connection.rb', line 63

def blocked?
  !@blocked.nil?
end

#channel(id = nil) ⇒ Channel

Open an AMQP channel

Parameters:

  • id (Integer, nil) (defaults to: nil)

    If nil a new channel will be opened, otherwise an already open channel might be reused

Returns:

Raises:

  • (ArgumentError)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/amqp/client/connection.rb', line 88

def channel(id = nil)
  raise ArgumentError, "Channel ID cannot be 0" if id&.zero?
  raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max

  if id
    ch = @channels[id] ||= Channel.new(self, id)
  else
    1.upto(@channel_max) do |i|
      break id = i unless @channels.key? i
    end
    raise Error, "Max channels reached" if id.nil?

    ch = @channels[id] = Channel.new(self, id)
  end
  ch.open
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close a connection

Parameters:

  • reason (String) (defaults to: "")

    A reason to close the connection can be logged by the broker

  • code (Integer) (defaults to: 200)

Returns:

  • (nil)


121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/amqp/client/connection.rb', line 121

def close(reason: "", code: 200)
  return if @closed

  @closed = [code, reason]
  @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) }
  if @blocked
    @socket.close
  else
    write_bytes FrameBytes.connection_close(code, reason)
    expect(:close_ok)
  end
  nil
end

#closed?Boolean

True if the connection is closed

Returns:

  • (Boolean)


147
148
149
# File 'lib/amqp/client/connection.rb', line 147

def closed?
  !@closed.nil?
end

#on_blocked {|String| ... } ⇒ nil

Callback called when client is blocked by the broker

Yields:

  • (String)

    reason to why the connection is being blocked

Returns:

  • (nil)


156
157
158
159
# File 'lib/amqp/client/connection.rb', line 156

def on_blocked(&blk)
  @on_blocked = blk
  nil
end

#on_unblocked { ... } ⇒ nil

Callback called when client is unblocked by the broker

Yields:

Returns:

  • (nil)


164
165
166
167
# File 'lib/amqp/client/connection.rb', line 164

def on_unblocked(&blk)
  @on_unblocked = blk
  nil
end

#read_loopnil

Reads from the socket, required for any kind of progress. Blocks until the connection is closed. Normally run as a background thread automatically.

Returns:

  • (nil)


188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/amqp/client/connection.rb', line 188

def read_loop
  # read more often than write so that channel errors crop up early
  Thread.current.priority += 1
  socket = @socket
  frame_max = @frame_max
  frame_start = String.new(capacity: 7)
  frame_buffer = String.new(capacity: frame_max)
  loop do
    socket.read(7, frame_start) || raise(IOError)
    type, channel_id, frame_size = frame_start.unpack("C S> L>")
    frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}")

    # read the frame content
    socket.read(frame_size, frame_buffer) || raise(IOError)

    # make sure that the frame end is correct
    frame_end = socket.readchar.ord
    raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206

    # parse the frame, will return false if a close frame was received
    parse_frame(type, channel_id, frame_buffer) || return
  end
  nil
rescue *READ_EXCEPTIONS => e
  @closed ||= [400, "read error: #{e.message}"]
  nil # ignore read errors
ensure
  @closed ||= [400, "unknown"]
  @replies.close
  begin
    if @write_lock.owned? # if connection is blocked
      @socket.close
    else
      @write_lock.synchronize do
        @socket.close
      end
    end
  rescue *READ_EXCEPTIONS
    nil
  end
end

#update_secret(secret, reason) ⇒ nil

Update authentication secret, for example when an OAuth backend is used

Parameters:

  • secret (String)

    The new secret

  • reason (String)

    A reason to update it

Returns:

  • (nil)


139
140
141
142
143
# File 'lib/amqp/client/connection.rb', line 139

def update_secret(secret, reason)
  write_bytes FrameBytes.update_secret(secret, reason)
  expect(:update_secret_ok)
  nil
end

#with_channel {|Channel| ... } ⇒ Object

Declare a new channel, yield, and then close the channel

Yields:

Returns:

  • (Object)

    Whatever was returned by the block



108
109
110
111
112
113
114
115
# File 'lib/amqp/client/connection.rb', line 108

def with_channel
  ch = channel
  begin
    yield ch
  ensure
    ch.close
  end
end