Class: Qpid::Proton::Messenger::Messenger

Inherits:
Object
  • Object
show all
Defined in:
lib/messenger/messenger.rb

Overview

The Messenger class defines a high level interface for sending and receiving Messages. Every Messenger contains a single logical queue of incoming messages and a single logical queue of outgoing messages. These messages in these queues may be destined for, or originate from, a variety of addresses.

The messenger interface is single-threaded. All methods except one ( #interrupt ) are intended to be used from within the messenger thread.

Sending & Receiving Messages

The Messenger class works in conjuction with the Message class. The Message class is a mutable holder of message content.

The put method copies its Message to the outgoing queue, and may send queued messages if it can do so without blocking. The send method blocks until it has sent the requested number of messages, or until a timeout interrupts the attempt.

Similarly, the recv method receives messages into the incoming queue, and may block as it attempts to receive the requested number of messages, or until timeout is reached. It may receive fewer than the requested number. The get method pops the eldest Message off the incoming queue and copies it into the Message object that you supply. It will not block.

The blocking attribute allows you to turn off blocking behavior entirely, in which case send and recv will do whatever they can without blocking, and then return. You can then look at the number of incoming and outgoing messages to see how much outstanding work still remains.

Constant Summary collapse

FLAGS_CHECK_ROUTES =
Cproton::PN_FLAGS_CHECK_ROUTES

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name = nil) ⇒ Messenger

Creates a new Messenger.

The name parameter is optional. If one is not provided then a unique name is generated.

Options

  • name - the name (def. nil)



76
77
78
79
80
81
82
# File 'lib/messenger/messenger.rb', line 76

def initialize(name = nil)
  @impl = Cproton.pn_messenger(name)
  @selectables = {}
  ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
  Cproton.pn_messenger_set_outgoing_window(@impl, 2_147_483_647)
  Cproton.pn_messenger_set_incoming_window(@impl, 2_147_483_647)
end

Class Method Details

.finalize!(impl) ⇒ Object

:nodoc:



84
85
86
87
88
# File 'lib/messenger/messenger.rb', line 84

def self.finalize!(impl) # :nodoc:
  proc {
    Cproton.pn_messenger_free(impl)
  }
end

Instance Method Details

#accept(tracker = nil) ⇒ Object

Signal the sender that you have acted on the Message pointed to by the tracker. If no tracker is supplied, then all messages that have been returned by the get method are accepted, except those that have already been auto-settled by passing beyond your incoming window size.

Options

  • tracker - the tracker

Raises:

  • (TypeError)


604
605
606
607
608
609
610
611
612
613
# File 'lib/messenger/messenger.rb', line 604

def accept(tracker = nil)
  raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
  if tracker.nil? then
    tracker = self.incoming_tracker
    flag = Cproton::PN_CUMULATIVE
  else
    flag = 0
  end
  Cproton.pn_messenger_accept(@impl, tracker.impl, flag)
end

#blocking=(blocking) ⇒ Object

Sets the blocking mode.



143
144
145
# File 'lib/messenger/messenger.rb', line 143

def blocking=(blocking)
  Cproton.pn_messenger_set_blocking(@impl, blocking)
end

#blocking?Boolean

Returns true if blocking mode is enabled.

Enable or disable blocking behavior during message sending and receiving. This affects every blocking call, with the exception of work(). Currently, the affected calls are send, recv, and stop.

Returns:

  • (Boolean)


138
139
140
# File 'lib/messenger/messenger.rb', line 138

def blocking?
  Cproton.pn_messenger_is_blocking(@impl)
end

#certificateObject

Returns the path to a certificate file.



293
294
295
# File 'lib/messenger/messenger.rb', line 293

def certificate
  Cproton.pn_messenger_get_certificate(@impl)
end

#certificate=(certificate) ⇒ Object

Path to a certificate file for the Messenger.

This certificate is used when the Messenger accepts or establishes SSL/TLS connections. This property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS connections do not require this property.

Options

  • certificate - the certificate



287
288
289
# File 'lib/messenger/messenger.rb', line 287

def certificate=(certificate)
  Cproton.pn_messenger_set_certificate(@impl, certificate)
end

#clear_errorObject

Clears the current error state.



191
192
193
194
195
196
# File 'lib/messenger/messenger.rb', line 191

def clear_error
  error = Cproton.pn_messenger_error(@impl)
  unless error.nil?
    Cproton.pn_error_clear(error)
  end
end

#deadlineObject



166
167
168
169
# File 'lib/messenger/messenger.rb', line 166

def deadline
  tstamp = Cproton.pn_messenger_deadline(@impl)
  return tstamp / 1000.0 unless tstamp.nil?
end

#errnoObject

Returns the most recent error number.



179
180
181
# File 'lib/messenger/messenger.rb', line 179

def errno
  Cproton.pn_messenger_errno(@impl)
end

#errorObject

Returns the most recent error message.



185
186
187
# File 'lib/messenger/messenger.rb', line 185

def error
  Cproton.pn_error_text(Cproton.pn_messenger_error(@impl))
end

#error?Boolean

Reports whether an error occurred.

Returns:

  • (Boolean)


173
174
175
# File 'lib/messenger/messenger.rb', line 173

def error?
  !Cproton.pn_messenger_errno(@impl).zero?
end

#flagsObject

Gets the control flags for a Messenger.



230
231
232
# File 'lib/messenger/messenger.rb', line 230

def flags
  Cproton.pn_messenger_get_flags(@impl)
end

#flags=(flags) ⇒ Object

Set control flags to enable additional function for the Messenger.

Options

  • flags the flags to set on the messenger



225
226
227
# File 'lib/messenger/messenger.rb', line 225

def flags=(flags)
  Cproton.pn_messenger_set_flags(@impl, flags)
end

#get(msg = nil) ⇒ Object

Moves the message from the head of the incoming message queue into the supplied message object. Any content in the supplied message will be overwritten. A tracker for the incoming Message is returned. The tracker can later be used to communicate your acceptance or rejection of the Message.

If no message is provided in the argument, then one is created. In either case, the one returned will be the fetched message.

Options

  • msg - the (optional) Message instance to be used



396
397
398
399
400
401
402
403
404
405
406
# File 'lib/messenger/messenger.rb', line 396

def get(msg = nil)
  msg_impl = nil
  if msg.nil? then
    msg_impl = nil
  else
    msg_impl = msg.impl
  end
  perform_get(msg_impl)
  msg.post_decode unless msg.nil?
  return incoming_tracker
end

#incomingObject

Returns the number of messages in the incoming queue that have not been retrieved.



477
478
479
# File 'lib/messenger/messenger.rb', line 477

def incoming
  Cproton.pn_messenger_incoming(@impl)
end

#incoming_trackerObject

Returns a Tracker for the most recently received message.



588
589
590
591
592
# File 'lib/messenger/messenger.rb', line 588

def incoming_tracker
  impl = Cproton.pn_messenger_incoming_tracker(@impl)
  return nil if impl == -1
  Tracker.new(impl)
end

#incoming_windowObject

Returns the incoming window.



693
694
695
# File 'lib/messenger/messenger.rb', line 693

def incoming_window
  Cproton.pn_messenger_get_incoming_window(@impl)
end

#incoming_window=(window) ⇒ Object

Sets the incoming window.

The Messenger will track the remote status of this many incoming deliveries after they have been accepted or rejected.

Messages enter this window only when you take them into your application using get(). If your incoming window size is n, and you get n+1 messages without explicitly accepting or rejecting the oldest message, then the message that passes beyond the edge of the incoming window will be assigned the default disposition of its link.

Options

  • window - the window size

Raises:

  • (TypeError)


686
687
688
689
# File 'lib/messenger/messenger.rb', line 686

def incoming_window=(window)
  raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
  Cproton.pn_messenger_set_incoming_window(@impl, window)
end

#interruptObject

Attempts interrupting of the messenger thread.

The Messenger interface is single-threaded, and this is the only function intended to be called from outside of is thread.

Call this from a non-Messenger thread to interrupt it while it is blocking. This will cause a ::InterruptError to be raised.

If there is no currently blocking call, then the next blocking call will be affected, even if it is within the same thread that originated the interrupt.



447
448
449
# File 'lib/messenger/messenger.rb', line 447

def interrupt
  Cproton.pn_messenger_interrupt(@impl)
end

#nameObject

Returns the name.



92
93
94
# File 'lib/messenger/messenger.rb', line 92

def name
  Cproton.pn_messenger_name(@impl)
end

#outgoingObject

Returns the number messages in the outgoing queue that have not been transmitted.



470
471
472
# File 'lib/messenger/messenger.rb', line 470

def outgoing
  Cproton.pn_messenger_outgoing(@impl)
end

#outgoing_trackerObject

Returns a Tracker for the message most recently sent via the put method.



580
581
582
583
584
# File 'lib/messenger/messenger.rb', line 580

def outgoing_tracker
  impl = Cproton.pn_messenger_outgoing_tracker(@impl)
  return nil if impl == -1
  Tracker.new(impl)
end

#outgoing_windowObject

Returns the outgoing window.



717
718
719
# File 'lib/messenger/messenger.rb', line 717

def outgoing_window
  Cproton.pn_messenger_get_outgoing_window(@impl)
end

#outgoing_window=(window) ⇒ Object

Sets the outgoing window.

The Messenger will track the remote status of this many outgoing deliveries after calling send. A Message enters this window when you call the put() method with the message. If your outgoing window size is n, and you call put n+1 times, status information will no longer be available for the first message.

Options

  • window - the window size

Raises:

  • (TypeError)


710
711
712
713
# File 'lib/messenger/messenger.rb', line 710

def outgoing_window=(window)
  raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
  Cproton.pn_messenger_set_outgoing_window(@impl, window)
end

#passive=(mode) ⇒ Object

Turns passive mode on or off.

When set to passive mode, Messenger will not attempt to perform I/O operations internally. In this mode it is necesssary to use the Selectable type to drive any I/O needed to perform requestioned actions.

In this mode Messenger will never block.



162
163
164
# File 'lib/messenger/messenger.rb', line 162

def passive=(mode)
  Cproton.pn_messenger_set_passive(@impl, mode)
end

#passive?Boolean

Returns true if passive mode is enabled.

Returns:

  • (Boolean)


149
150
151
# File 'lib/messenger/messenger.rb', line 149

def passive?
  Cproton.pn_messenger_is_passive(@impl)
end

#passwordObject

Returns the password property for the Messenger.private_key file.



109
110
111
# File 'lib/messenger/messenger.rb', line 109

def password
  Cproton.pn_messenger_get_password(@impl)
end

#password=(password) ⇒ Object

This property contains the password for the Messenger.private_key file, or nil if the file is not encrypted.

Arguments

  • password - the password



103
104
105
# File 'lib/messenger/messenger.rb', line 103

def password=(password)
  Cproton.pn_messenger_set_password(@impl, password)
end

#private_keyObject

Returns the path to a private key file.



314
315
316
# File 'lib/messenger/messenger.rb', line 314

def private_key
  Cproton.pn_messenger_get_private_key(@impl)
end

#private_key=(key) ⇒ Object

Path to a private key file for the Messenger.

The property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connections. Non client authenticated SSL/TLS connections do not require this property.

Options

  • key - the key file



308
309
310
# File 'lib/messenger/messenger.rb', line 308

def private_key=(key)
  Cproton.pn_messenger_set_private_key(@impl, key)
end

#put(message) ⇒ Object

Places the content contained in the message onto the outgoing queue of the Messenger.

This method will never block, however it will send any unblocked Messages in the outgoing queue immediately and leave any blocked Messages remaining in the outgoing queue. The send call may then be used to block until the outgoing queue is empty. The outgoing attribute may be used to check the depth of the outgoing queue.

Options

  • message - the message



350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/messenger/messenger.rb', line 350

def put(message)
  if message.nil?
    raise TypeError.new("invalid message: #{message}")
  end
  unless message.kind_of?(Qpid::Proton::Message)
    raise ::ArgumentError.new("invalid message type: #{message.class}")
  end
  # encode the message first
  message.pre_encode
  perform_put(message)
  return outgoing_tracker
end

#rcv_settle_mode=(mode) ⇒ Object

Set the local receiver settle mode for the underlying links.

Options

  • mode - the receiver settle mode



248
249
250
# File 'lib/messenger/messenger.rb', line 248

def rcv_settle_mode=(mode)
  Cproton.pn_messenger_set_rcv_settle_mode(@impl, mode)
end

#receive(limit = -1)) ⇒ Object

Receives up to limit messages into the incoming queue. If no value for limit is supplied, this call will receive as many messages as it can buffer internally. If the Messenger is in blocking mode, this call will block until at least one Message is available in the incoming queue.

Options ====

  • limit - the maximum number of messages to receive



426
427
428
# File 'lib/messenger/messenger.rb', line 426

def receive(limit = -1)
  Cproton.pn_messenger_recv(@impl, limit)
end

#receiving?Boolean

Returns true if the messenger is currently receiving data.

Returns:

  • (Boolean)


431
432
433
# File 'lib/messenger/messenger.rb', line 431

def receiving?
  Cproton.pn_messenger_receiving(@impl)
end

#reject(tracker) ⇒ Object

Rejects the incoming message identified by the tracker. If no tracker is supplied, all messages that have been returned by the get method are rejected, except those that have already been auto-settled by passing beyond your outgoing window size.

Options

  • tracker - the tracker

Raises:

  • (TypeError)


624
625
626
627
628
629
630
631
632
633
# File 'lib/messenger/messenger.rb', line 624

def reject(tracker)
  raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
  if tracker.nil? then
    tracker = self.incoming_tracker
    flag = Cproton::PN_CUMULATIVE
  else
    flag = 0
  end
  Cproton.pn_messenger_reject(@impl, tracker.impl, flag)
end

#rewrite(pattern, address) ⇒ Object

Similar to #route, except that the destination of the Message is determined before the message address is rewritten.

The outgoing address is only rewritten after routing has been finalized. If a message has an outgoing address of “amqp://0.0.0.0:5678”, and a rewriting rule that changes its outgoing address to “foo”, it will still arrive at the peer that is listening on “amqp://0.0.0.0:5678”, but when it arrives there, the receiver will see its outgoing address as “foo”.

The default rewrite rule removes username and password from addresses before they are transmitted.

Arguments

  • pattern - the outgoing address

  • address - the target address



557
558
559
# File 'lib/messenger/messenger.rb', line 557

def rewrite(pattern, address)
  Cproton.pn_messenger_rewrite(@impl, pattern, address)
end

#route(pattern, address) ⇒ Object

Adds a routing rule to the Messenger’s internal routing table.

The route procedure may be used to influence how a Messenger will internally treat a given address or class of addresses. Every call to the route procedure will result in Messenger appending a routing rule to its internal routing table.

Whenever a Message is presented to a Messenger for delivery, it will match the address of this message against the set of routing rules in order. The first rule to match will be triggered, and instead of routing based on the address presented in the message, the Messenger will route based on the address supplied in the rule.

The pattern matching syntax supports two types of matches, a ‘%’ will match any character except a ‘/’, and a ‘*’ will match any character including a ‘/’.

A routing address is specified as a normal AMQP address, however it may additionally use substitution variables from the pattern match that triggered the rule.

Arguments

  • pattern - the address pattern

  • address - the target address

Examples

# route messages sent to foo to the destionaty amqp://foo.com
messenger.route("foo", "amqp://foo.com")

# any message to foobar will be routed to amqp://foo.com/bar
messenger.route("foobar", "amqp://foo.com/bar")

# any message to bar/<path> will be routed to the same path within
# the amqp://bar.com domain
messenger.route("bar/*", "amqp://bar.com/$1")

# route all Message objects over TLS
messenger.route("amqp:*", "amqps:$1")

# supply credentials for foo
messenger.route("amqp://foo.com/*", "amqp://user:[email protected]/$1")

# supply credentials for all domains
messenger.route("amqp://*", "amqp://user:password@$1")

# route all addresses through a single proxy while preserving the
# original destination
messenger.route("amqp://%$/*", "amqp://user:password@proxy/$1/$2")

# route any address through a single broker
messenger.route("*", "amqp://user:password@broker/$1")


535
536
537
# File 'lib/messenger/messenger.rb', line 535

def route(pattern, address)
  Cproton.pn_messenger_route(@impl, pattern, address)
end

#selectableObject



561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'lib/messenger/messenger.rb', line 561

def selectable
  impl = Cproton.pn_messenger_selectable(@impl)

  # if we don't have any selectables, then return
  return nil if impl.nil?

  fd = Cproton.pn_selectable_get_fd(impl)

  selectable = @selectables[fd]
  if selectable.nil?
    selectable = Selectable.new(self, impl)
    @selectables[fd] = selectable
  end
  return selectable
end

#send(n = -1)) ⇒ Object

This call will block until the indicated number of messages have been sent, or until the operation times out. If n is -1 this call will block until all outgoing messages have been sent. If n is 0 then this call will send whatever it can without blocking.



378
379
380
# File 'lib/messenger/messenger.rb', line 378

def send(n = -1)
  Cproton.pn_messenger_send(@impl, n)
end

#settle(tracker) ⇒ Object

Frees a Messenger from tracking the status associated with a given tracker. If you don’t supply a tracker, all outgoing messages up to the most recent will be settled.

Options

  • tracker - the tracker

Examples

Raises:

  • (TypeError)


660
661
662
663
664
665
666
667
668
669
# File 'lib/messenger/messenger.rb', line 660

def settle(tracker)
  raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
  if tracker.nil? then
    tracker = self.incoming_tracker
    flag = Cproton::PN_CUMULATIVE
  else
    flag = 0
  end
  Cproton.pn_messenger_settle(@impl, tracker.impl, flag)
end

#snd_settle_mode=(mode) ⇒ Object

Set the local sender settle mode for the underlying links.

Options

  • mode - the sender settle mode



239
240
241
# File 'lib/messenger/messenger.rb', line 239

def snd_settle_mode=(mode)
  Cproton.pn_messenger_set_snd_settle_mode(@impl, mode)
end

#startObject

Currently a no-op placeholder. For future compatibility, do not send or recv messages before starting the Messenger.



202
203
204
# File 'lib/messenger/messenger.rb', line 202

def start
  Cproton.pn_messenger_start(@impl)
end

#status(tracker) ⇒ Object

Gets the last known remote state of the delivery associated with the given tracker, as long as the Message is still within your outgoing window. (Also works on incoming messages that are still within your incoming queue. See TrackerStatus for details on the values returned.

Options

  • tracker - the tracker

Raises:

  • (TypeError)


645
646
647
648
# File 'lib/messenger/messenger.rb', line 645

def status(tracker)
  raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
  TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl))
end

#stopObject

Stops the Messenger, preventing it from sending or receiving any more messages.



209
210
211
# File 'lib/messenger/messenger.rb', line 209

def stop
  Cproton.pn_messenger_stop(@impl)
end

#stopped?Boolean

Returns true if a Messenger is in the stopped state. This function does not block.

Returns:

  • (Boolean)


216
217
218
# File 'lib/messenger/messenger.rb', line 216

def stopped?
  Cproton.pn_messenger_stopped(@impl)
end

#subscribe(address, timeout = 0) ⇒ Object

Subscribes the Messenger to messages originating from the specified source. The source is an address as specified in the Messenger introduction with the following addition. If the domain portion of the address begins with the ‘~’ character, the Messenger will interpret the domain as host/port, bind to it, and listen for incoming messages. For example “~0.0.0.0”, “amqp://~0.0.0.0” will all bind to any local interface and listen for incoming messages. An address of “amqps://~0.0.0.0” will only permit incoming SSL connections.

Options

  • address - the source address to be subscribe

  • timeout - an optional time-to-live value, in seconds, for the

    subscription
    

Raises:

  • (TypeError)


268
269
270
271
272
273
# File 'lib/messenger/messenger.rb', line 268

def subscribe(address, timeout=0)
  raise TypeError.new("invalid address: #{address}") if address.nil?
  subscription = Cproton.pn_messenger_subscribe_ttl(@impl, address, timeout)
  raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil?
  Subscription.new(subscription)
end

#timeoutObject

Returns the timeout period



128
129
130
# File 'lib/messenger/messenger.rb', line 128

def timeout
  Cproton.pn_messenger_get_timeout(@impl)
end

#timeout=(timeout) ⇒ Object

Sets the timeout period, in milliseconds.

A negative timeout period implies an infinite timeout.

Options

  • timeout - the timeout period

Raises:

  • (TypeError)


121
122
123
124
# File 'lib/messenger/messenger.rb', line 121

def timeout=(timeout)
  raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil?
  Cproton.pn_messenger_set_timeout(@impl, timeout)
end

#trusted_certificatesObject

The path to the databse of trusted certificates.



332
333
334
# File 'lib/messenger/messenger.rb', line 332

def trusted_certificates
  Cproton.pn_messenger_get_trusted_certificates(@impl)
end

#trusted_certificates=(certificates) ⇒ Object

A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection. If this property is nil, then the peer will not be verified.

Options

  • certificates - the certificates path



326
327
328
# File 'lib/messenger/messenger.rb', line 326

def trusted_certificates=(certificates)
  Cproton.pn_messenger_set_trusted_certificates(@impl,certificates)
end

#unregister_selectable(fileno) ⇒ Object

Unregisters a selectable object.



722
723
724
# File 'lib/messenger/messenger.rb', line 722

def unregister_selectable(fileno) # :nodoc:
  @selectables.delete(fileno)
end

#work(timeout = -1)) ⇒ Object

Sends or receives any outstanding messages queued for a Messenger.

This will block for the indicated timeout. This method may also do I/O other than sending and receiving messages. For example, closing connections after stop() has been called.



457
458
459
460
461
462
463
464
465
# File 'lib/messenger/messenger.rb', line 457

def work(timeout=-1)
  err = Cproton.pn_messenger_work(@impl, timeout)
  if (err == Cproton::PN_TIMEOUT) then
    return false
  else
    check_for_error(err)
    return true
  end
end