Class: Qpid::Proton::Messenger::Messenger
- Inherits:
-
Object
- Object
- Qpid::Proton::Messenger::Messenger
- Defined in:
- lib/messenger/messenger.rb
Overview
The Messenger
class defines a high level interface for sending and receiving Messages. Every Messenger contains a single logical queue of incoming messages and a single logical queue of outgoing messages. These messages in these queues may be destined for, or originate from, a variety of addresses.
The messenger interface is single-threaded. All methods except one ( #interrupt ) are intended to be used from within the messenger thread.
Sending & Receiving Messages
The Messenger class works in conjuction with the Message class. The Message class is a mutable holder of message content.
The put method copies its Message to the outgoing queue, and may send queued messages if it can do so without blocking. The send method blocks until it has sent the requested number of messages, or until a timeout interrupts the attempt.
Similarly, the recv method receives messages into the incoming queue, and may block as it attempts to receive the requested number of messages, or until timeout is reached. It may receive fewer than the requested number. The get method pops the eldest Message off the incoming queue and copies it into the Message object that you supply. It will not block.
The blocking attribute allows you to turn off blocking behavior entirely, in which case send and recv will do whatever they can without blocking, and then return. You can then look at the number of incoming and outgoing messages to see how much outstanding work still remains.
Constant Summary collapse
- FLAGS_CHECK_ROUTES =
Cproton::PN_FLAGS_CHECK_ROUTES
Class Method Summary collapse
-
.finalize!(impl) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#accept(tracker = nil) ⇒ Object
Signal the sender that you have acted on the Message pointed to by the tracker.
-
#blocking=(blocking) ⇒ Object
Sets the blocking mode.
-
#blocking? ⇒ Boolean
Returns true if blocking mode is enabled.
-
#certificate ⇒ Object
Returns the path to a certificate file.
-
#certificate=(certificate) ⇒ Object
Path to a certificate file for the
Messenger
. -
#clear_error ⇒ Object
Clears the current error state.
- #deadline ⇒ Object
-
#errno ⇒ Object
Returns the most recent error number.
-
#error ⇒ Object
Returns the most recent error message.
-
#error? ⇒ Boolean
Reports whether an error occurred.
-
#flags ⇒ Object
Gets the control flags for a Messenger.
-
#flags=(flags) ⇒ Object
Set control flags to enable additional function for the Messenger.
-
#get(msg = nil) ⇒ Object
Moves the message from the head of the incoming message queue into the supplied message object.
-
#incoming ⇒ Object
Returns the number of messages in the incoming queue that have not been retrieved.
-
#incoming_tracker ⇒ Object
Returns a
Tracker
for the most recently received message. -
#incoming_window ⇒ Object
Returns the incoming window.
-
#incoming_window=(window) ⇒ Object
Sets the incoming window.
-
#initialize(name = nil) ⇒ Messenger
constructor
Creates a new
Messenger
. -
#interrupt ⇒ Object
Attempts interrupting of the messenger thread.
-
#name ⇒ Object
Returns the name.
-
#outgoing ⇒ Object
Returns the number messages in the outgoing queue that have not been transmitted.
-
#outgoing_tracker ⇒ Object
Returns a
Tracker
for the message most recently sent via the put method. -
#outgoing_window ⇒ Object
Returns the outgoing window.
-
#outgoing_window=(window) ⇒ Object
Sets the outgoing window.
-
#passive=(mode) ⇒ Object
Turns passive mode on or off.
-
#passive? ⇒ Boolean
Returns true if passive mode is enabled.
-
#password ⇒ Object
Returns the password property for the Messenger.private_key file.
-
#password=(password) ⇒ Object
This property contains the password for the Messenger.private_key file, or
nil
if the file is not encrypted. -
#private_key ⇒ Object
Returns the path to a private key file.
-
#private_key=(key) ⇒ Object
Path to a private key file for the
Messenger
. -
#put(message) ⇒ Object
Places the content contained in the message onto the outgoing queue of the Messenger.
-
#rcv_settle_mode=(mode) ⇒ Object
Set the local receiver settle mode for the underlying links.
-
#receive(limit = -1)) ⇒ Object
Receives up to limit messages into the incoming queue.
-
#receiving? ⇒ Boolean
Returns true if the messenger is currently receiving data.
-
#reject(tracker) ⇒ Object
Rejects the incoming message identified by the tracker.
-
#rewrite(pattern, address) ⇒ Object
Similar to #route, except that the destination of the Message is determined before the message address is rewritten.
-
#route(pattern, address) ⇒ Object
Adds a routing rule to the Messenger’s internal routing table.
- #selectable ⇒ Object
-
#send(n = -1)) ⇒ Object
This call will block until the indicated number of messages have been sent, or until the operation times out.
-
#settle(tracker) ⇒ Object
Frees a Messenger from tracking the status associated with a given tracker.
-
#snd_settle_mode=(mode) ⇒ Object
Set the local sender settle mode for the underlying links.
-
#start ⇒ Object
Currently a no-op placeholder.
-
#status(tracker) ⇒ Object
Gets the last known remote state of the delivery associated with the given tracker, as long as the Message is still within your outgoing window.
-
#stop ⇒ Object
Stops the
Messenger
, preventing it from sending or receiving any more messages. -
#stopped? ⇒ Boolean
Returns true if a Messenger is in the stopped state.
-
#subscribe(address, timeout = 0) ⇒ Object
Subscribes the Messenger to messages originating from the specified source.
-
#timeout ⇒ Object
Returns the timeout period.
-
#timeout=(timeout) ⇒ Object
Sets the timeout period, in milliseconds.
-
#trusted_certificates ⇒ Object
The path to the databse of trusted certificates.
-
#trusted_certificates=(certificates) ⇒ Object
A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection.
-
#unregister_selectable(fileno) ⇒ Object
Unregisters a selectable object.
-
#work(timeout = -1)) ⇒ Object
Sends or receives any outstanding messages queued for a Messenger.
Constructor Details
#initialize(name = nil) ⇒ Messenger
Creates a new Messenger
.
The name
parameter is optional. If one is not provided then a unique name is generated.
Options
-
name - the name (def. nil)
76 77 78 79 80 81 82 |
# File 'lib/messenger/messenger.rb', line 76 def initialize(name = nil) @impl = Cproton.pn_messenger(name) @selectables = {} ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) Cproton.pn_messenger_set_outgoing_window(@impl, 2_147_483_647) Cproton.pn_messenger_set_incoming_window(@impl, 2_147_483_647) end |
Class Method Details
.finalize!(impl) ⇒ Object
:nodoc:
84 85 86 87 88 |
# File 'lib/messenger/messenger.rb', line 84 def self.finalize!(impl) # :nodoc: proc { Cproton.pn_messenger_free(impl) } end |
Instance Method Details
#accept(tracker = nil) ⇒ Object
Signal the sender that you have acted on the Message pointed to by the tracker. If no tracker is supplied, then all messages that have been returned by the get method are accepted, except those that have already been auto-settled by passing beyond your incoming window size.
Options
-
tracker - the tracker
604 605 606 607 608 609 610 611 612 613 |
# File 'lib/messenger/messenger.rb', line 604 def accept(tracker = nil) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end Cproton.pn_messenger_accept(@impl, tracker.impl, flag) end |
#blocking=(blocking) ⇒ Object
Sets the blocking mode.
143 144 145 |
# File 'lib/messenger/messenger.rb', line 143 def blocking=(blocking) Cproton.pn_messenger_set_blocking(@impl, blocking) end |
#blocking? ⇒ Boolean
Returns true if blocking mode is enabled.
Enable or disable blocking behavior during message sending and receiving. This affects every blocking call, with the exception of work(). Currently, the affected calls are send, recv, and stop.
138 139 140 |
# File 'lib/messenger/messenger.rb', line 138 def blocking? Cproton.pn_messenger_is_blocking(@impl) end |
#certificate ⇒ Object
Returns the path to a certificate file.
293 294 295 |
# File 'lib/messenger/messenger.rb', line 293 def certificate Cproton.pn_messenger_get_certificate(@impl) end |
#certificate=(certificate) ⇒ Object
Path to a certificate file for the Messenger
.
This certificate is used when the Messenger
accepts or establishes SSL/TLS connections. This property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS connections do not require this property.
Options
-
certificate - the certificate
287 288 289 |
# File 'lib/messenger/messenger.rb', line 287 def certificate=(certificate) Cproton.pn_messenger_set_certificate(@impl, certificate) end |
#clear_error ⇒ Object
Clears the current error state.
191 192 193 194 195 196 |
# File 'lib/messenger/messenger.rb', line 191 def clear_error error = Cproton.pn_messenger_error(@impl) unless error.nil? Cproton.pn_error_clear(error) end end |
#deadline ⇒ Object
166 167 168 169 |
# File 'lib/messenger/messenger.rb', line 166 def deadline tstamp = Cproton.pn_messenger_deadline(@impl) return tstamp / 1000.0 unless tstamp.nil? end |
#errno ⇒ Object
Returns the most recent error number.
179 180 181 |
# File 'lib/messenger/messenger.rb', line 179 def errno Cproton.pn_messenger_errno(@impl) end |
#error ⇒ Object
Returns the most recent error message.
185 186 187 |
# File 'lib/messenger/messenger.rb', line 185 def error Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) end |
#error? ⇒ Boolean
Reports whether an error occurred.
173 174 175 |
# File 'lib/messenger/messenger.rb', line 173 def error? !Cproton.pn_messenger_errno(@impl).zero? end |
#flags ⇒ Object
Gets the control flags for a Messenger.
230 231 232 |
# File 'lib/messenger/messenger.rb', line 230 def flags Cproton.pn_messenger_get_flags(@impl) end |
#flags=(flags) ⇒ Object
Set control flags to enable additional function for the Messenger.
Options
-
flags the flags to set on the messenger
225 226 227 |
# File 'lib/messenger/messenger.rb', line 225 def flags=(flags) Cproton.pn_messenger_set_flags(@impl, flags) end |
#get(msg = nil) ⇒ Object
Moves the message from the head of the incoming message queue into the supplied message object. Any content in the supplied message will be overwritten. A tracker for the incoming Message is returned. The tracker can later be used to communicate your acceptance or rejection of the Message.
If no message is provided in the argument, then one is created. In either case, the one returned will be the fetched message.
Options
-
msg - the (optional)
Message
instance to be used
396 397 398 399 400 401 402 403 404 405 406 |
# File 'lib/messenger/messenger.rb', line 396 def get(msg = nil) msg_impl = nil if msg.nil? then msg_impl = nil else msg_impl = msg.impl end perform_get(msg_impl) msg.post_decode unless msg.nil? return incoming_tracker end |
#incoming ⇒ Object
Returns the number of messages in the incoming queue that have not been retrieved.
477 478 479 |
# File 'lib/messenger/messenger.rb', line 477 def incoming Cproton.pn_messenger_incoming(@impl) end |
#incoming_tracker ⇒ Object
Returns a Tracker
for the most recently received message.
588 589 590 591 592 |
# File 'lib/messenger/messenger.rb', line 588 def incoming_tracker impl = Cproton.pn_messenger_incoming_tracker(@impl) return nil if impl == -1 Tracker.new(impl) end |
#incoming_window ⇒ Object
Returns the incoming window.
693 694 695 |
# File 'lib/messenger/messenger.rb', line 693 def incoming_window Cproton.pn_messenger_get_incoming_window(@impl) end |
#incoming_window=(window) ⇒ Object
Sets the incoming window.
The Messenger will track the remote status of this many incoming deliveries after they have been accepted or rejected.
Messages enter this window only when you take them into your application using get(). If your incoming window size is n, and you get n+1 messages without explicitly accepting or rejecting the oldest message, then the message that passes beyond the edge of the incoming window will be assigned the default disposition of its link.
Options
-
window - the window size
686 687 688 689 |
# File 'lib/messenger/messenger.rb', line 686 def incoming_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) Cproton.pn_messenger_set_incoming_window(@impl, window) end |
#interrupt ⇒ Object
Attempts interrupting of the messenger thread.
The Messenger interface is single-threaded, and this is the only function intended to be called from outside of is thread.
Call this from a non-Messenger thread to interrupt it while it is blocking. This will cause a ::InterruptError to be raised.
If there is no currently blocking call, then the next blocking call will be affected, even if it is within the same thread that originated the interrupt.
447 448 449 |
# File 'lib/messenger/messenger.rb', line 447 def interrupt Cproton.pn_messenger_interrupt(@impl) end |
#name ⇒ Object
Returns the name.
92 93 94 |
# File 'lib/messenger/messenger.rb', line 92 def name Cproton.pn_messenger_name(@impl) end |
#outgoing ⇒ Object
Returns the number messages in the outgoing queue that have not been transmitted.
470 471 472 |
# File 'lib/messenger/messenger.rb', line 470 def outgoing Cproton.pn_messenger_outgoing(@impl) end |
#outgoing_tracker ⇒ Object
Returns a Tracker
for the message most recently sent via the put method.
580 581 582 583 584 |
# File 'lib/messenger/messenger.rb', line 580 def outgoing_tracker impl = Cproton.pn_messenger_outgoing_tracker(@impl) return nil if impl == -1 Tracker.new(impl) end |
#outgoing_window ⇒ Object
Returns the outgoing window.
717 718 719 |
# File 'lib/messenger/messenger.rb', line 717 def outgoing_window Cproton.pn_messenger_get_outgoing_window(@impl) end |
#outgoing_window=(window) ⇒ Object
Sets the outgoing window.
The Messenger will track the remote status of this many outgoing deliveries after calling send. A Message enters this window when you call the put() method with the message. If your outgoing window size is n, and you call put n+1 times, status information will no longer be available for the first message.
Options
-
window - the window size
710 711 712 713 |
# File 'lib/messenger/messenger.rb', line 710 def outgoing_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) Cproton.pn_messenger_set_outgoing_window(@impl, window) end |
#passive=(mode) ⇒ Object
Turns passive mode on or off.
When set to passive mode, Messenger will not attempt to perform I/O operations internally. In this mode it is necesssary to use the Selectable type to drive any I/O needed to perform requestioned actions.
In this mode Messenger will never block.
162 163 164 |
# File 'lib/messenger/messenger.rb', line 162 def passive=(mode) Cproton.pn_messenger_set_passive(@impl, mode) end |
#passive? ⇒ Boolean
Returns true if passive mode is enabled.
149 150 151 |
# File 'lib/messenger/messenger.rb', line 149 def passive? Cproton.pn_messenger_is_passive(@impl) end |
#password ⇒ Object
Returns the password property for the Messenger.private_key file.
109 110 111 |
# File 'lib/messenger/messenger.rb', line 109 def password Cproton.pn_messenger_get_password(@impl) end |
#password=(password) ⇒ Object
This property contains the password for the Messenger.private_key file, or nil
if the file is not encrypted.
Arguments
-
password - the password
103 104 105 |
# File 'lib/messenger/messenger.rb', line 103 def password=(password) Cproton.pn_messenger_set_password(@impl, password) end |
#private_key ⇒ Object
Returns the path to a private key file.
314 315 316 |
# File 'lib/messenger/messenger.rb', line 314 def private_key Cproton.pn_messenger_get_private_key(@impl) end |
#private_key=(key) ⇒ Object
Path to a private key file for the Messenger
.
The property must be specified for the Messenger
to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connections. Non client authenticated SSL/TLS connections do not require this property.
Options
-
key - the key file
308 309 310 |
# File 'lib/messenger/messenger.rb', line 308 def private_key=(key) Cproton.pn_messenger_set_private_key(@impl, key) end |
#put(message) ⇒ Object
Places the content contained in the message onto the outgoing queue of the Messenger.
This method will never block, however it will send any unblocked Messages in the outgoing queue immediately and leave any blocked Messages remaining in the outgoing queue. The send call may then be used to block until the outgoing queue is empty. The outgoing attribute may be used to check the depth of the outgoing queue.
Options
-
message - the message
350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/messenger/messenger.rb', line 350 def put() if .nil? raise TypeError.new("invalid message: #{}") end unless .kind_of?(Qpid::Proton::Message) raise ::ArgumentError.new("invalid message type: #{.class}") end # encode the message first .pre_encode perform_put() return outgoing_tracker end |
#rcv_settle_mode=(mode) ⇒ Object
Set the local receiver settle mode for the underlying links.
Options
-
mode - the receiver settle mode
248 249 250 |
# File 'lib/messenger/messenger.rb', line 248 def rcv_settle_mode=(mode) Cproton.pn_messenger_set_rcv_settle_mode(@impl, mode) end |
#receive(limit = -1)) ⇒ Object
Receives up to limit messages into the incoming queue. If no value for limit is supplied, this call will receive as many messages as it can buffer internally. If the Messenger is in blocking mode, this call will block until at least one Message is available in the incoming queue.
Options ====
-
limit - the maximum number of messages to receive
426 427 428 |
# File 'lib/messenger/messenger.rb', line 426 def receive(limit = -1) Cproton.pn_messenger_recv(@impl, limit) end |
#receiving? ⇒ Boolean
Returns true if the messenger is currently receiving data.
431 432 433 |
# File 'lib/messenger/messenger.rb', line 431 def receiving? Cproton.pn_messenger_receiving(@impl) end |
#reject(tracker) ⇒ Object
Rejects the incoming message identified by the tracker. If no tracker is supplied, all messages that have been returned by the get method are rejected, except those that have already been auto-settled by passing beyond your outgoing window size.
Options
-
tracker - the tracker
624 625 626 627 628 629 630 631 632 633 |
# File 'lib/messenger/messenger.rb', line 624 def reject(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end Cproton.pn_messenger_reject(@impl, tracker.impl, flag) end |
#rewrite(pattern, address) ⇒ Object
Similar to #route, except that the destination of the Message is determined before the message address is rewritten.
The outgoing address is only rewritten after routing has been finalized. If a message has an outgoing address of “amqp://0.0.0.0:5678”, and a rewriting rule that changes its outgoing address to “foo”, it will still arrive at the peer that is listening on “amqp://0.0.0.0:5678”, but when it arrives there, the receiver will see its outgoing address as “foo”.
The default rewrite rule removes username and password from addresses before they are transmitted.
Arguments
-
pattern - the outgoing address
-
address - the target address
557 558 559 |
# File 'lib/messenger/messenger.rb', line 557 def rewrite(pattern, address) Cproton.pn_messenger_rewrite(@impl, pattern, address) end |
#route(pattern, address) ⇒ Object
Adds a routing rule to the Messenger’s internal routing table.
The route procedure may be used to influence how a Messenger will internally treat a given address or class of addresses. Every call to the route procedure will result in Messenger appending a routing rule to its internal routing table.
Whenever a Message is presented to a Messenger for delivery, it will match the address of this message against the set of routing rules in order. The first rule to match will be triggered, and instead of routing based on the address presented in the message, the Messenger will route based on the address supplied in the rule.
The pattern matching syntax supports two types of matches, a ‘%’ will match any character except a ‘/’, and a ‘*’ will match any character including a ‘/’.
A routing address is specified as a normal AMQP address, however it may additionally use substitution variables from the pattern match that triggered the rule.
Arguments
-
pattern - the address pattern
-
address - the target address
Examples
# route messages sent to foo to the destionaty amqp://foo.com
messenger.route("foo", "amqp://foo.com")
# any message to foobar will be routed to amqp://foo.com/bar
messenger.route("foobar", "amqp://foo.com/bar")
# any message to bar/<path> will be routed to the same path within
# the amqp://bar.com domain
messenger.route("bar/*", "amqp://bar.com/$1")
# route all Message objects over TLS
messenger.route("amqp:*", "amqps:$1")
# supply credentials for foo
messenger.route("amqp://foo.com/*", "amqp://user:[email protected]/$1")
# supply credentials for all domains
messenger.route("amqp://*", "amqp://user:password@$1")
# route all addresses through a single proxy while preserving the
# original destination
messenger.route("amqp://%$/*", "amqp://user:password@proxy/$1/$2")
# route any address through a single broker
messenger.route("*", "amqp://user:password@broker/$1")
535 536 537 |
# File 'lib/messenger/messenger.rb', line 535 def route(pattern, address) Cproton.pn_messenger_route(@impl, pattern, address) end |
#selectable ⇒ Object
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
# File 'lib/messenger/messenger.rb', line 561 def selectable impl = Cproton.pn_messenger_selectable(@impl) # if we don't have any selectables, then return return nil if impl.nil? fd = Cproton.pn_selectable_get_fd(impl) selectable = @selectables[fd] if selectable.nil? selectable = Selectable.new(self, impl) @selectables[fd] = selectable end return selectable end |
#send(n = -1)) ⇒ Object
This call will block until the indicated number of messages have been sent, or until the operation times out. If n is -1 this call will block until all outgoing messages have been sent. If n is 0 then this call will send whatever it can without blocking.
378 379 380 |
# File 'lib/messenger/messenger.rb', line 378 def send(n = -1) Cproton.pn_messenger_send(@impl, n) end |
#settle(tracker) ⇒ Object
Frees a Messenger from tracking the status associated with a given tracker. If you don’t supply a tracker, all outgoing messages up to the most recent will be settled.
Options
-
tracker - the tracker
Examples
660 661 662 663 664 665 666 667 668 669 |
# File 'lib/messenger/messenger.rb', line 660 def settle(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end Cproton.pn_messenger_settle(@impl, tracker.impl, flag) end |
#snd_settle_mode=(mode) ⇒ Object
Set the local sender settle mode for the underlying links.
Options
-
mode - the sender settle mode
239 240 241 |
# File 'lib/messenger/messenger.rb', line 239 def snd_settle_mode=(mode) Cproton.pn_messenger_set_snd_settle_mode(@impl, mode) end |
#start ⇒ Object
Currently a no-op placeholder. For future compatibility, do not send or recv messages before starting the Messenger
.
202 203 204 |
# File 'lib/messenger/messenger.rb', line 202 def start Cproton.pn_messenger_start(@impl) end |
#status(tracker) ⇒ Object
Gets the last known remote state of the delivery associated with the given tracker, as long as the Message is still within your outgoing window. (Also works on incoming messages that are still within your incoming queue. See TrackerStatus for details on the values returned.
Options
-
tracker - the tracker
645 646 647 648 |
# File 'lib/messenger/messenger.rb', line 645 def status(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl)) end |
#stop ⇒ Object
Stops the Messenger
, preventing it from sending or receiving any more messages.
209 210 211 |
# File 'lib/messenger/messenger.rb', line 209 def stop Cproton.pn_messenger_stop(@impl) end |
#stopped? ⇒ Boolean
Returns true if a Messenger is in the stopped state. This function does not block.
216 217 218 |
# File 'lib/messenger/messenger.rb', line 216 def stopped? Cproton.pn_messenger_stopped(@impl) end |
#subscribe(address, timeout = 0) ⇒ Object
Subscribes the Messenger to messages originating from the specified source. The source is an address as specified in the Messenger introduction with the following addition. If the domain portion of the address begins with the ‘~’ character, the Messenger will interpret the domain as host/port, bind to it, and listen for incoming messages. For example “~0.0.0.0”, “amqp://~0.0.0.0” will all bind to any local interface and listen for incoming messages. An address of “amqps://~0.0.0.0” will only permit incoming SSL connections.
Options
-
address - the source address to be subscribe
-
timeout - an optional time-to-live value, in seconds, for the
subscription
268 269 270 271 272 273 |
# File 'lib/messenger/messenger.rb', line 268 def subscribe(address, timeout=0) raise TypeError.new("invalid address: #{address}") if address.nil? subscription = Cproton.pn_messenger_subscribe_ttl(@impl, address, timeout) raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? Subscription.new(subscription) end |
#timeout ⇒ Object
Returns the timeout period
128 129 130 |
# File 'lib/messenger/messenger.rb', line 128 def timeout Cproton.pn_messenger_get_timeout(@impl) end |
#timeout=(timeout) ⇒ Object
Sets the timeout period, in milliseconds.
A negative timeout period implies an infinite timeout.
Options
-
timeout - the timeout period
121 122 123 124 |
# File 'lib/messenger/messenger.rb', line 121 def timeout=(timeout) raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil? Cproton.pn_messenger_set_timeout(@impl, timeout) end |
#trusted_certificates ⇒ Object
The path to the databse of trusted certificates.
332 333 334 |
# File 'lib/messenger/messenger.rb', line 332 def trusted_certificates Cproton.pn_messenger_get_trusted_certificates(@impl) end |
#trusted_certificates=(certificates) ⇒ Object
A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection. If this property is nil
, then the peer will not be verified.
Options
-
certificates - the certificates path
326 327 328 |
# File 'lib/messenger/messenger.rb', line 326 def trusted_certificates=(certificates) Cproton.pn_messenger_set_trusted_certificates(@impl,certificates) end |
#unregister_selectable(fileno) ⇒ Object
Unregisters a selectable object.
722 723 724 |
# File 'lib/messenger/messenger.rb', line 722 def unregister_selectable(fileno) # :nodoc: @selectables.delete(fileno) end |
#work(timeout = -1)) ⇒ Object
Sends or receives any outstanding messages queued for a Messenger.
This will block for the indicated timeout. This method may also do I/O other than sending and receiving messages. For example, closing connections after stop() has been called.
457 458 459 460 461 462 463 464 465 |
# File 'lib/messenger/messenger.rb', line 457 def work(timeout=-1) err = Cproton.pn_messenger_work(@impl, timeout) if (err == Cproton::PN_TIMEOUT) then return false else check_for_error(err) return true end end |