Class: HornetQ::Client::RequestorPattern

Inherits:
Object
  • Object
show all
Defined in:
lib/hornetq/client/requestor_pattern.rb

Overview

Implements the Requestor Pattern

Send a request to a server and wait for a reply

Parameters

  • session The session to use processing this request Note: Sessions cannot be shared concurrently by multiple threads

  • request_address Address to send requests to. It is expected that process listening to requests at this address has implemented the ServerPattern

  • reply_address If supplied the reply_address must already exist and will be used for receiving responses If not supplied a temporary queue will be created and used by this instance of the RequestorPattern This optional parameter is normally not used

  • reply_queue If a reply_address is supplied, the reply_queue name can be supplied if it differs from reply_address

Instance Method Summary collapse

Constructor Details

#initialize(session, request_address, reply_address = nil, reply_queue = nil) ⇒ RequestorPattern

Returns a new instance of RequestorPattern.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/hornetq/client/requestor_pattern.rb', line 23

def initialize(session, request_address, reply_address=nil, reply_queue=nil)
  @session = session
  @producer = session.create_producer(request_address)
  if reply_address
    @reply_address = reply_address
    @reply_queue = reply_queue || reply_address
    @destroy_temp_queue = false
  else
    @reply_queue = @reply_address = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}"
    begin
      session.create_temporary_queue(@reply_address, @reply_queue)
      @destroy_temp_queue = true
    rescue NativeException => exc
      p exc
    end
  end
end

Instance Method Details

#closeObject



106
107
108
109
# File 'lib/hornetq/client/requestor_pattern.rb', line 106

def close
  @session.delete_queue(@reply_queue) if @destroy_temp_queue
  @producer.close if @producer
end

#request(request_message, timeout) ⇒ Object

Synchronous Request and wait for reply

Returns the message received, or nil if no message was received in the specified timeout.

The supplied request_message is updated as follows

  • The property JMSReplyTo is set to the name of the reply to address

  • Creates and sets the message user_id if not already set

  • #TODO: The expiry is set to the message timeout if not already set

Note:

  • The request will only look for a reply message with the same user_id (message id) as the message that was sent. This is critical since a previous receive may have timed out and we do not want to pickup the reponse to an earlier request

To receive a message after a timeout, call wait_for_reply with a nil message id to receive any message on the queue

Use: submit_request & then wait_for_reply to break it into

two separate calls


62
63
64
65
66
# File 'lib/hornetq/client/requestor_pattern.rb', line 62

def request(request_message, timeout)
  #TODO set message expiry to timeout if not already set
  message_id = submit_request(request_message)
  wait_for_reply(message_id, timeout)
end

#submit_request(request_message) ⇒ Object

Asynchronous Request

Use: submit_request & then wait_for_reply to break the request into
     two separate calls.

For example, submit the request now, do some work, then later on in the same thread wait for the reply.

The supplied request_message is updated as follows

  • The property JMSReplyTo is set to the name of the reply to address

  • Creates and sets the message user_id if not already set

  • #TODO: The expiry is set to the message timeout if not already set

Returns Message id of the message that was sent



81
82
83
84
85
86
# File 'lib/hornetq/client/requestor_pattern.rb', line 81

def submit_request(request_message)
  request_message.reply_to_address = @reply_address
  request_message.generate_user_id unless request_message.user_id
  @producer.send(request_message)
  request_message.user_id
end

#wait_for_reply(user_id, timeout) ⇒ Object

Asynchronous wait for reply

Parameters:

user_id: the user defined id to correlate a response for

Supply a nil user_id to receive any message from the queue

Returns the message received

Note: Call submit_request before calling this method



98
99
100
101
102
103
104
# File 'lib/hornetq/client/requestor_pattern.rb', line 98

def wait_for_reply(user_id, timeout)
  # We only want the reply to the supplied message_id, so set filter on message id
  filter = "#{Java::org.hornetq.api.core::FilterConstants::HORNETQ_USERID} = 'ID:#{user_id}'" if user_id
  @session.consumer(:queue_name => @reply_queue, :filter=>filter) do |consumer|
    consumer.receive(timeout)
  end
end