Class: Qpid::Proton::Reactor::Container

Inherits:
Reactor
  • Object
show all
Includes:
Util::Reactor, Util::UUID
Defined in:
lib/reactor/container.rb

Overview

A representation of the AMQP concept of a container which, loosely speaking, is something that establishes links to or from another container on which messages are transferred.

This is an extension to the Reactor classthat adds convenience methods for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender and Qpid::Proton::Receiver.

Instance Attribute Summary collapse

Attributes inherited from Reactor

#errors

Instance Method Summary collapse

Methods included from Util::UUID

#generate_uuid

Methods included from Util::Reactor

#create_session

Methods inherited from Reactor

#acceptor, #connection, #handler, #handler=, #on_error, #process, #push_event, #quiesced?, #run, #schedule, #selectable, #timeout, #timeout=, #update, #wakeup, wrap

Constructor Details

#initialize(handlers, options = {}) ⇒ Container

Returns a new instance of Container.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/reactor/container.rb', line 58

def initialize(handlers, options = {})
  super(handlers, options)

  # only do the following if we're creating a new instance
  if !options.has_key?(:impl)
    @ssl = SSLConfig.new
    if options[:global_handler]
      self.global_handler = GlobalOverrides.new(options[:global_handler])
    else
      # very ugly, but using self.global_handler doesn't work in the constructor
      ghandler = Reactor.instance_method(:global_handler).bind(self).call
      ghandler = GlobalOverrides.new(ghandler)
      Reactor.instance_method(:global_handler=).bind(self).call(ghandler)
    end
    @trigger = nil
    @container_id = generate_uuid
  end
end

Instance Attribute Details

#container_idObject

Returns the value of attribute container_id.



55
56
57
# File 'lib/reactor/container.rb', line 55

def container_id
  @container_id
end

#global_handlerObject

Returns the value of attribute global_handler.



56
57
58
# File 'lib/reactor/container.rb', line 56

def global_handler
  @global_handler
end

Instance Method Details



256
257
258
259
260
261
262
263
264
# File 'lib/reactor/container.rb', line 256

def _apply_link_options(options, link)
  if !options.nil? && !options.empty?
    if !options.is_a?(::List)
      options = [Options].flatten
    end

    options.each {|option| o.apply(link) if o.test(link)}
  end
end

#_session(context) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/reactor/container.rb', line 110

def _session(context)
  if context.is_a?(Qpid::Proton::URL)
    return self._session(self.connect(:url => context))
  elsif context.is_a?(Qpid::Proton::Session)
    return context
  elsif context.is_a?(Qpid::Proton::Connection)
    if context.session_policy?
      return context.session_policy.session(context)
    else
      return self.create_session(context)
    end
  else
    return context.session
  end
end

#connect(options = {}) ⇒ Object

Initiates the establishment of an AMQP connection.

Parameters:

  • options (Hash) (defaults to: {})

    A hash of named arguments.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/reactor/container.rb', line 81

def connect(options = {})
  conn = self.connection(options[:handler])
  conn.container = self.container_id || generate_uuid
  connector = Connector.new(conn)
  conn.overrides = connector
  if !options[:url].nil?
    connector.address = URLs.new([options[:url]])
  elsif !options[:urls].nil?
    connector.address = URLs.new(options[:urls])
  elsif !options[:address].nil?
    connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])])
  else
    raise ::ArgumentError.new("either :url or :urls or :address required")
  end

  connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil?
  if !options[:reconnect].nil?
    connector.reconnect = options[:reconnect]
  else
    connector.reconnect = Backoff.new()
  end

  connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable

  conn.open

  return conn
end

#create_receiver(context, opts = {}) ⇒ Receiver

Initiates the establishment of a link over which messages can be received.

There are two accepted arguments for the context

1. If a Connection is supplied then the link is established using that

object. The source, and optionally the target, address can be supplied

2. If it is a String or a URL then a new Connection is created on which

the link will be attached. If a path is specified, but not the source address, then the path of the URL is used as the target address.

The name will be generated for the link if one is not specified.

Parameters:

  • context (Connection, URL, String)

    The connection or the address.

  • opts (Hash) (defaults to: {})

    Additional otpions.

Options Hash (opts):

  • The (String, Qpid::Proton::URL)

    source address.

  • :target (String)

    The target address

  • :name (String)

    The link name.

  • :dynamic (Boolean)
  • :handler (Object)
  • :options (Hash)

    Additional link options.

Returns:



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/reactor/container.rb', line 185

def create_receiver(context, opts = {})
  if context.is_a?(::String)
    context = Qpid::Proton::URL.new(context)
  end

  source = opts[:source]
  if context.is_a?(Qpid::Proton::URL) && source.nil?
    source = context.path
  end

  session = self._session(context)

  receiver = session.receiver(opts[:name] ||
                              id(session.connection.container,
                                  source, opts[:target]))
  receiver.source.address = source if source
  receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
  receiver.target.address = opts[:target] if !opts[:target].nil?
  receiver.handler = opts[:handler] if !opts[:handler].nil?
  self._apply_link_options(opts[:options], receiver)
  receiver.open
  return receiver
end

#create_sender(context, opts = {}) ⇒ Sender

Initiates the establishment of a link over which messages can be sent.

Parameters:

  • context (String, URL)

    The context.

  • opts (Hash) (defaults to: {})

    Additional options.

  • opts (String, Qpid::Proton::URL) (defaults to: {})

    The target address.

  • opts (String) (defaults to: {})

    :source The source address.

  • opts (Boolean) (defaults to: {})

    :dynamic

  • opts (Object) (defaults to: {})

    :handler

  • opts (Object) (defaults to: {})

    :tag_generator The tag generator.

  • opts (Hash) (defaults to: {})

    :options Addtional link options

Returns:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/reactor/container.rb', line 139

def create_sender(context, opts = {})
  if context.is_a?(::String)
    context = Qpid::Proton::URL.new(context)
  end

  target = opts[:target]
  if context.is_a?(Qpid::Proton::URL) && target.nil?
    target = context.path
  end

  session = self._session(context)

  sender = session.sender(opts[:name] ||
                          id(session.connection.container,
                            target, opts[:source]))
    sender.source.address = opts[:source] if !opts[:source].nil?
    sender.target.address = target if target
    sender.handler = opts[:handler] if !opts[:handler].nil?
    sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
    self._apply_link_options(opts[:options], sender)
    sender.open
    return sender
end

#declare_transaction(context, handler = nil, settle_before_discharge = false) ⇒ Object



209
210
211
212
213
214
215
216
217
218
# File 'lib/reactor/container.rb', line 209

def declare_transaction(context, handler = nil, settle_before_discharge = false)
  if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil?
    class << context
      attr_accessor :txn_ctl
    end
    context.txn_ctl = self.create_sender(context, nil, "txn-ctl",
    InternalTransactionHandler.new())
  end
  return Transaction.new(context.txn_ctl, handler, settle_before_discharge)
end

#do_work(timeout = nil) ⇒ Object



239
240
241
242
# File 'lib/reactor/container.rb', line 239

def do_work(timeout = nil)
  self.timeout = timeout unless timeout.nil?
  self.process
end

#id(container, remote, local) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
# File 'lib/reactor/container.rb', line 244

def id(container, remote, local)
  if !local.nil? && !remote.nil?
    "#{container}-#{remote}-#{local}"
  elsif !local.nil?
    "#{container}-#{local}"
  elsif !remote.nil?
    "#{container}-#{remote}"
  else
    "#{container}-#{generate_uuid}"
  end
end

#listen(url, ssl_domain = nil) ⇒ Object

Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.

Parameters:

  • url
  • ssl_domain (defaults to: nil)


226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/reactor/container.rb', line 226

def listen(url, ssl_domain = nil)
  url = Qpid::Proton::URL.new(url)
  acceptor = self.acceptor(url.host, url.port)
  ssl_config = ssl_domain
  if ssl_config.nil? && (url.scheme == 'amqps') && @ssl
    ssl_config = @ssl.server
  end
  if !ssl_config.nil?
    acceptor.ssl_domain(ssl_config)
  end
  return acceptor
end

#to_sObject



266
267
268
# File 'lib/reactor/container.rb', line 266

def to_s
  "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>"
end