Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: SSLContextException

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object


355
356
357
# File 'lib/march_hare/session.rb', line 355

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection

Returns:


118
119
120
# File 'lib/march_hare/session.rb', line 118

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :executor_shutdown_timeout (Numeric) — default: 30.0

    when recovering from a network failure how long should we wait for the current threadpool to finish handling its messages

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean, String) — default: false

    Set to true to use TLS/SSL connection or TLS version name as a string, e.g. "TLSv1.1". This will switch port to 5671 by default.

  • :tls_certificate_path (String)

    Path to a PKCS12 certificate.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

See Also:


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/march_hare/session.rb', line 52

def self.connect(options = {})
  cf = ConnectionFactory.new

  if options[:uri]
    cf.uri          = options[:uri]          if options[:uri]
  else
    cf.host         = hostname_from(options) if include_host?(options)
    cf.port         = options[:port].to_i    if options[:port]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  end

  cf.connection_timeout  = timeout_from(options)  if include_timeout?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)
  cf.exception_handler   = exception_handler_from(options) if include_exception_handler?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    # TODO: logging
    $stdout.puts "Using TLS/SSL version #{tls}"
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    elsif (cert_path = tls_certificate_path_from(options)) && (password = tls_certificate_password_from(options))
      ctx = SSLContext.get_instance(tls)
      pwd = password.to_java.to_char_array
      begin
        is = File.new(cert_path).to_inputstream
        ks = KeyStore.get_instance('PKCS12')
        ks.load(is, pwd)

        kmf = KeyManagerFactory.get_instance("SunX509")
        kmf.init(ks, pwd)

        ctx.init(kmf.get_key_managers, [NullTrustManager.new].to_java('javax.net.ssl.TrustManager'), nil)

        cf.use_ssl_protocol(ctx)
      rescue Java::JavaLang::Throwable => e
        message = e.message
        message << "\n"
        message << e.backtrace.join("\n")

        raise SSLContextException.new(message)
      ensure
        is.close if is
      end
    else
      cf.use_ssl_protocol(tls)
    end
  end

  new(cf, options)
end

.tls_certificate_password_from(opts) ⇒ Object (protected)


571
572
573
# File 'lib/march_hare/session.rb', line 571

def self.tls_certificate_password_from(opts)
  opts[:tls_certificate_password] || opts[:ssl_certificate_password] || opts[:cert_password] || opts[:certificate_password]
end

.tls_certificate_path_from(opts) ⇒ Object (protected)


562
563
564
# File 'lib/march_hare/session.rb', line 562

def self.tls_certificate_path_from(opts)
  opts[:tls_cert] || opts[:ssl_cert] || opts[:tls_cert_path] || opts[:ssl_cert_path] || opts[:tls_certificate_path] || opts[:ssl_certificate_path]
end

Instance Method Details

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)


257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/march_hare/session.rb', line 257

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      if @uses_uri
        self.new_uri_connection_impl(@uri)
      else
        self.new_connection_impl(@hosts, @host_selection_strategy)
      end
    end
  end
  self.recover_shutdown_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end

  @connection = new_connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.


234
235
236
# File 'lib/march_hare/session.rb', line 234

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.


191
192
193
194
195
196
197
198
# File 'lib/march_hare/session.rb', line 191

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed

Returns:

  • (Boolean)

    true if this channel is closed


207
208
209
# File 'lib/march_hare/session.rb', line 207

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:


165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/march_hare/session.rb', line 165

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end
  if jc.nil?
    error_message = <<-MSG
      Unable to create a channel. This is likely due to having a channel_max setting
      on the rabbitmq broker (see https://www.rabbitmq.com/configure.html).
      There are currently #{@channels.size} channels on this connection.
    MSG
    raise ::MarchHare::ChannelError.new(error_message, false)
  end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#flushObject

Flushes the socket used by this connection.


305
306
307
# File 'lib/march_hare/session.rb', line 305

def flush
  @connection.flush
end

#hostnameObject Also known as: host


335
336
337
# File 'lib/march_hare/session.rb', line 335

def hostname
  @cf.host
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler


224
225
226
# File 'lib/march_hare/session.rb', line 224

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.


214
215
216
217
218
219
220
221
# File 'lib/march_hare/session.rb', line 214

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler


229
230
231
# File 'lib/march_hare/session.rb', line 229

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise

Returns:

  • (Boolean)

    true if connection is open, false otherwise


201
202
203
# File 'lib/march_hare/session.rb', line 201

def open?
  @connection.open?
end

#portObject


340
341
342
# File 'lib/march_hare/session.rb', line 340

def port
  @cf.port
end

#shut_down_executor_pool_and_await_timeoutObject (protected)


540
541
542
543
544
545
546
547
548
549
# File 'lib/march_hare/session.rb', line 540

def shut_down_executor_pool_and_await_timeout
  return unless defined?(@executor) && @executor
  ms_to_wait = (@executor_shutdown_timeout * 1000).to_i
  @executor.shutdown()
  unless @executor.awaitTermination(ms_to_wait, java.util.concurrent.TimeUnit::MILLISECONDS)
    @executor.shutdownNow()
  end
rescue java.lang.InterruptedException
  #no op, just means we got a forced shutdown
end

#startObject

No-op, exists for better API compatibility with Bunny.


315
316
317
318
319
320
321
322
323
324
# File 'lib/march_hare/session.rb', line 315

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
end

#tls?Boolean Also known as: ssl?

Returns:

  • (Boolean)

344
345
346
347
348
349
350
351
352
# File 'lib/march_hare/session.rb', line 344

def tls?
  if @uses_uri
    u = java.net.URI.new(@uri.to_java_string)

    u.scheme == "amqps"
  else
    self.port == ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
  end
end

#to_sString

Returns:

  • (String)

360
361
362
# File 'lib/march_hare/session.rb', line 360

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#usernameObject Also known as: user


326
327
328
# File 'lib/march_hare/session.rb', line 326

def username
  @cf.username
end

#vhostObject


331
332
333
# File 'lib/march_hare/session.rb', line 331

def vhost
  @cf.virtual_host
end