Class: HornetQ::Client::RequestorPattern
- Inherits:
-
Object
- Object
- HornetQ::Client::RequestorPattern
- Defined in:
- lib/hornetq/client/requestor_pattern.rb
Overview
Implements the Requestor Pattern
Send a request to a server and wait for a reply
Parameters
-
session The session to use processing this request Note: Sessions cannot be shared concurrently by multiple threads
-
request_address Address to send requests to. It is expected that process listening to requests at this address has implemented the ServerPattern
-
reply_address If supplied the reply_address must already exist and will be used for receiving responses If not supplied a temporary queue will be created and used by this instance of the RequestorPattern This optional parameter is normally not used
-
reply_queue If a reply_address is supplied, the reply_queue name can be supplied if it differs from reply_address
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(session, request_address, reply_address = nil, reply_queue = nil) ⇒ RequestorPattern
constructor
A new instance of RequestorPattern.
-
#request(request_message, timeout) ⇒ Object
Synchronous Request and wait for reply Returns the message received, or nil if no message was received in the specified timeout.
-
#submit_request(request_message) ⇒ Object
Asynchronous Request Use: submit_request & then wait_for_reply to break the request into two separate calls.
-
#wait_for_reply(user_id, timeout) ⇒ Object
Asynchronous wait for reply.
Constructor Details
#initialize(session, request_address, reply_address = nil, reply_queue = nil) ⇒ RequestorPattern
Returns a new instance of RequestorPattern.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/hornetq/client/requestor_pattern.rb', line 23 def initialize(session, request_address, reply_address=nil, reply_queue=nil) @session = session @producer = session.create_producer(request_address) if reply_address @reply_address = reply_address @reply_queue = reply_queue || reply_address @destroy_temp_queue = false else @reply_queue = @reply_address = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}" begin session.create_temporary_queue(@reply_address, @reply_queue) @destroy_temp_queue = true rescue NativeException => exc p exc end end end |
Instance Method Details
#close ⇒ Object
106 107 108 109 |
# File 'lib/hornetq/client/requestor_pattern.rb', line 106 def close @session.delete_queue(@reply_queue) if @destroy_temp_queue @producer.close if @producer end |
#request(request_message, timeout) ⇒ Object
Synchronous Request and wait for reply
Returns the message received, or nil if no message was received in the specified timeout.
The supplied request_message is updated as follows
-
The property JMSReplyTo is set to the name of the reply to address
-
Creates and sets the message user_id if not already set
-
#TODO: The expiry is set to the message timeout if not already set
Note:
-
The request will only look for a reply message with the same user_id (message id) as the message that was sent. This is critical since a previous receive may have timed out and we do not want to pickup the reponse to an earlier request
To receive a message after a timeout, call wait_for_reply with a nil message id to receive any message on the queue
Use: submit_request & then wait_for_reply to break it into
two separate calls
62 63 64 65 66 |
# File 'lib/hornetq/client/requestor_pattern.rb', line 62 def request(, timeout) #TODO set message expiry to timeout if not already set = submit_request() wait_for_reply(, timeout) end |
#submit_request(request_message) ⇒ Object
Asynchronous Request
Use: submit_request & then wait_for_reply to break the request into
two separate calls.
For example, submit the request now, do some work, then later on in the same thread wait for the reply.
The supplied request_message is updated as follows
-
The property JMSReplyTo is set to the name of the reply to address
-
Creates and sets the message user_id if not already set
-
#TODO: The expiry is set to the message timeout if not already set
Returns Message id of the message that was sent
81 82 83 84 85 86 |
# File 'lib/hornetq/client/requestor_pattern.rb', line 81 def submit_request() .reply_to_address = @reply_address .generate_user_id unless .user_id @producer.send() .user_id end |
#wait_for_reply(user_id, timeout) ⇒ Object
Asynchronous wait for reply
Parameters:
user_id: the user defined id to correlate a response for
Supply a nil user_id to receive any message from the queue
Returns the message received
Note: Call submit_request before calling this method
98 99 100 101 102 103 104 |
# File 'lib/hornetq/client/requestor_pattern.rb', line 98 def wait_for_reply(user_id, timeout) # We only want the reply to the supplied message_id, so set filter on message id filter = "#{Java::org.hornetq.api.core::FilterConstants::HORNETQ_USERID} = 'ID:#{user_id}'" if user_id @session.consumer(:queue_name => @reply_queue, :filter=>filter) do |consumer| consumer.receive(timeout) end end |