Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/march_hare/session.rb', line 81

def initialize(connection_factory, opts = {})
  @cf               = connection_factory
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)

  @hosts            = self.class.hosts_from(opts)
  @default_host_selection_strategy = lambda { |hosts| hosts.sample }
  @host_selection_strategy         = opts[:host_selection_strategy] || @default_host_selection_strategy

  @connection       = self.new_connection_impl(@hosts, @host_selection_strategy)
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



266
267
268
# File 'lib/march_hare/session.rb', line 266

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



77
78
79
# File 'lib/march_hare/session.rb', line 77

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean) — default: false

    Set to true to use TLS/SSL connection. This will switch port to 5671 by default.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

See Also:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/march_hare/session.rb', line 43

def self.connect(options={})
  cf = ConnectionFactory.new

  cf.uri                = options[:uri]          if options[:uri]
  cf.host               = hostname_from(options) if include_host?(options)
  cf.port               = options[:port].to_i    if options[:port]
  cf.virtual_host       = vhost_from(options)    if include_vhost?(options)
  cf.connection_timeout = timeout_from(options)  if include_timeout?(options)
  cf.username           = username_from(options) if include_username?(options)
  cf.password           = password_from(options) if include_password?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)
  cf.exception_handler   = exception_handler_from(options) if include_exception_handler?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    else
      cf.use_ssl_protocol(tls)
    end
  end


  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



183
184
185
186
187
188
189
190
191
# File 'lib/march_hare/session.rb', line 183

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if !signal.initiated_by_application
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/march_hare/session.rb', line 200

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      self.new_connection_impl(@hosts, @host_selection_strategy)
    end
  end
  @thread_pool = ThreadPools.dynamically_growing
  self.recover_shutdown_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end

  @connection = new_connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



177
178
179
# File 'lib/march_hare/session.rb', line 177

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



134
135
136
137
138
139
140
141
# File 'lib/march_hare/session.rb', line 134

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



150
151
152
# File 'lib/march_hare/session.rb', line 150

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/march_hare/session.rb', line 116

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



194
195
196
# File 'lib/march_hare/session.rb', line 194

def disable_automatic_recovery
  @connetion.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



245
246
247
# File 'lib/march_hare/session.rb', line 245

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



250
251
252
# File 'lib/march_hare/session.rb', line 250

def heartbeat=(n)
  @connection.heartbeat = n
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



167
168
169
# File 'lib/march_hare/session.rb', line 167

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



157
158
159
160
161
162
163
164
# File 'lib/march_hare/session.rb', line 157

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



172
173
174
# File 'lib/march_hare/session.rb', line 172

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



144
145
146
# File 'lib/march_hare/session.rb', line 144

def open?
  @connection.open?
end

#recover_shutdown_hooks(connection) ⇒ Object



238
239
240
241
242
# File 'lib/march_hare/session.rb', line 238

def recover_shutdown_hooks(connection)
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



281
282
283
# File 'lib/march_hare/session.rb', line 281

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#startObject

No-op, exists for better API compatibility with Bunny.



255
256
257
258
259
260
261
262
263
264
# File 'lib/march_hare/session.rb', line 255

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
end

#to_sString

Returns:

  • (String)


271
272
273
# File 'lib/march_hare/session.rb', line 271

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



286
287
288
# File 'lib/march_hare/session.rb', line 286

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end