Module: Java::org.hornetq.api.core.client::ClientSession
- Defined in:
- lib/hornetq/client/org_hornetq_api_core_client_client_session.rb
Overview
For each thread that will be processing messages concurrently a separate session is required.
Interface org.hornetq.api.core.client.ClientSession
Other methods still directly accessible through this class:
add_failure_listener(SessionFailureListener listener)
Adds a FailureListener to the session which is notified if a failure occurs on the session
binding_query(SimpleString address)
Queries information on a binding
close()
Closes this session
commit()
Commits the current transaction
ClientMessage create_message(boolean durable)
Creates a ClientMessage.
ClientMessage create_message(byte type, boolean durable)
Creates a ClientMessage.
ClientMessage create_message(byte type, boolean durable, long expiration, long timestamp, byte priority)
Creates a ClientMessage.
ClientProducer create_producer()
Creates a producer with no default address.
ClientProducer create_producer(SimpleString address)
Creates a producer which sends to the given address
ClientProducer create_producer(SimpleString address, int rate)
Creates a producer which sends to the given address
ClientProducer create_producer(String address)
Creates a producer which sends to the given address
void create_queue(String address, String queueName)
Creates a non-temporary queue non-durable queue.
void create_queue(String address, String queueName, boolean durable)
Creates a non-temporary queue.
void create_queue(String address, String queueName, String filter, boolean durable)
Creates a non-temporaryqueue.
void create_temporary_queue(String address, String queueName)
Creates a temporary queue.
void create_temporary_queue(String address, String queueName, String filter)
Creates a temporary queue with a filter.
void delete_queue(String queueName)
Deletes the queue.
int version()
Returns the server's incrementingVersion.
XAResource xa_resource()
Returns the XAResource associated to the session.
auto_commit_acks?
Returns whether the session will automatically commit its transaction every time a message is acknowledged by a ClientConsumer created by this session, false else
auto_commit_sends?
Returns whether the session will automatically commit its transaction every time a message is sent by a ClientProducer created by this session, false else
block_on_acknowledge?
Returns whether the ClientConsumer created by the session will block when they acknowledge a message
closed?
Returns whether the session is closed or not.
rollback_only?
Returns true if the current transaction has been flagged to rollback, false else
xa?
Return true if the session supports XA, false else
ClientSession.QueueQuery queue_query(SimpleString queueName)
Queries information on a queue
boolean removeFailureListener(SessionFailureListener listener)
Removes a FailureListener to the session
void rollback()
Rolls back the current transaction
void rollback(boolean considerLastMessageAsDelivered)
Rolls back the current transaction
void set_send_acknowledgement_handler(SendAcknowledgementHandler handler)
Sets a SendAcknowledgementHandler for this session
void start()
Starts the session
void stop()
Stops the session
Instance Method Summary collapse
-
#consume(params, &block) ⇒ Object
Consume or browse all messages matching the filter from the queue with the given name, calls the supplied block for every message received from the queue.
-
#consumer(params = {}, &block) ⇒ Object
Creates a ClientConsumer to consume or browse messages matching the filter from the queue with the given name, calls the supplied block, then close the consumer.
-
#create_consumer_from_params(params = {}) ⇒ Object
Create a consumer using named parameters.
-
#create_queue_ignore_exists(address, queue, durable) ⇒ Object
Create a queue if it doesn’t already exist.
-
#create_requestor(request_address, reply_address = nil, reply_queue = nil) ⇒ Object
To be consistent create Requestor from Session.
-
#create_server(input_queue, timeout = 0) ⇒ Object
Create a server handler for receiving requests and responding with replies to the supplied address.
-
#producer(params = nil, &block) ⇒ Object
Creates a ClientProducer to send messages, calls the supplied block, then close the consumer If the parameter is a String, then it must be the address to send messages to Otherwise, the parameters can be supplied in a Hash.
-
#requestor(request_address, reply_address = nil, reply_queue = nil, &block) ⇒ Object
Creates a RequestorPattern to send a request and to synchronously wait for the reply, call the supplied block, then close the requestor Returns the result from the block.
-
#server(input_queue, timeout = 0, &block) ⇒ Object
Creates a ServerPattern to send messages to consume messages and send replies, call the supplied block, then close the server Returns the result from the block.
Instance Method Details
#consume(params, &block) ⇒ Object
Consume or browse all messages matching the filter from the queue with the given name, calls the supplied block for every message received from the queue. Once the timeout has been reached it closes the consumer
Parameters:
:timeout How to timeout waiting for messages
-1 : Wait forever
0 : Return immediately if no message is available (default)
x : Wait for x milli-seconds for a message to be received from the server
Note: Messages may still be on the queue, but the server has not supplied any messages
in the time interval specified
Default: 0
:queue_name => The name of the queue to consume messages from. Mandatory
:filter => Only consume messages matching the filter: Default: nil
:browse_only => Whether to just browse the queue or consume messages
true | false. Default: false
:window_size => The consumer window size.
:max_rate => The maximum rate to consume messages.
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
Statistics are cumulative between calls to ::each and will only be
reset when ::each is called again with :statistics => true
Note: If either :window_size or :max_rate is supplied, then BOTH are required
Returns the statistics gathered when :statistics => true, otherwise nil
Example
session.consume(:queue_name => 'my_queue', :timeout => 1000) do ||
p
.acknowledge
end
Example
# Just browse the messages without consuming them
session.consume(:queue_name => 'my_queue', :timeout => 1000, :browse_only => true) do ||
p
.acknowledge
end
177 178 179 180 181 182 183 184 185 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 177 def consume(params, &block) raise "Missing mandatory code block" unless block c = self.create_consumer_from_params(params) begin c.each(params, &block) ensure c.close end end |
#consumer(params = {}, &block) ⇒ Object
Creates a ClientConsumer to consume or browse messages matching the filter from the queue with the given name, calls the supplied block, then close the consumer
If the parameter is a String, then it must be the queue name to consume messages from. Otherwise, the parameters can be supplied in a Hash
The parameters for creating the consumer are as follows:
:queue_name => The name of the queue to consume messages from. Mandatory
:filter => Only consume messages matching the filter: Default: nil
:browse_only => Whether to just browse the queue or consume messages
true | false. Default: false
:window_size => The consumer window size.
:max_rate => The maximum rate to consume messages.
Note: If either :window_size or :max_rate is supplied, then BOTH are required
Returns the result from the block
Example
session.consumer('my_queue') do |consumer|
msg = consumer.receive_immediate
p msg
msg.acknowledge
end
Example
# Just browse the messages without consuming them
session.consumer(:queue_name => ##'my_queue', :browse_only => true) do |consumer|
msg = consumer.receive_immediate
p msg
msg.acknowledge
end
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 125 def consumer(params={}, &block) raise "Missing mandatory code block" unless block consumer = nil begin consumer = create_consumer_from_params(params) block.call(consumer) ensure consumer.close if consumer end end |
#create_consumer_from_params(params = {}) ⇒ Object
Create a consumer using named parameters. The following Java create_consumer methods are still directly accessible:
create_consumer(String queueName)
Creates a ClientConsumer to consume messages from the queue with the given name
create_consumer(String queueName, boolean browseOnly)
Creates a ClientConsumer to consume or browse messages from the queue with the given name.
create_consumer(String queueName, String filter)
Creates a ClientConsumer to consume messages matching the filter from the queue with the given name.
create_consumer(String queueName, String filter, boolean browseOnly)
Creates a ClientConsumer to consume or browse messages matching the filter from the queue with the given name.
create_consumer(String queueName, String filter, int windowSize, int maxRate, boolean browseOnly)
Creates a ClientConsumer to consume or browse messages matching the filter from the queue with the given name.
The parameters for creating the consumer are as follows:
:queue_name => The name of the queue to consume messages from. Mandatory
:filter => Only consume messages matching the filter: Default: nil
:browse_only => Whether to just browse the queue or consume messages
true | false. Default: false
:window_size => The consumer window size.
:max_rate => The maximum rate to consume messages.
Note: If either :window_size or :max_rate is supplied, then BOTH are required
Returns a new Consumer that can be used for consuming messages from the queue
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 212 def create_consumer_from_params(params={}) if params.kind_of?(Hash) raise("Missing mandatory parameter :queue_name") unless queue_name = params[:queue_name] if params[:max_rate] || params[:window_size] self.create_consumer( queue_name, params[:filter], params[:window_size], params[:max_rate], params.fetch(:browse_only, false)) else self.create_consumer( queue_name, params[:filter], params.fetch(:browse_only, false)) end else self.create_consumer(params) end end |
#create_queue_ignore_exists(address, queue, durable) ⇒ Object
Create a queue if it doesn’t already exist
322 323 324 325 326 327 328 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 322 def create_queue_ignore_exists(address, queue, durable) begin create_queue(address, queue, durable) rescue Java::org.hornetq.api.core.HornetQException => e raise unless e.cause.code == Java::org.hornetq.api.core.HornetQException::QUEUE_EXISTS end end |
#create_requestor(request_address, reply_address = nil, reply_queue = nil) ⇒ Object
To be consistent create Requestor from Session
287 288 289 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 287 def create_requestor(request_address, reply_address=nil, reply_queue=nil) HornetQ::Client::RequestorPattern.new(self, request_address, reply_address, reply_queue) end |
#create_server(input_queue, timeout = 0) ⇒ Object
Create a server handler for receiving requests and responding with replies to the supplied address
305 306 307 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 305 def create_server(input_queue, timeout=0) HornetQ::Client::ServerPattern.new(self, input_queue, timeout) end |
#producer(params = nil, &block) ⇒ Object
Creates a ClientProducer to send messages, calls the supplied block, then close the consumer
If the parameter is a String, then it must be the address to send messages to Otherwise, the parameters can be supplied in a Hash
The parameters for creating the consumer are as follows:
:address => The address to which to send messages. If not supplied here,
then the destination address must be supplied with every message
:rate => The producer rate
Returns the result from the block
Example
session.producer('MyAddress') do |producer|
msg = session.
msg.type = :text
producer.send(msg)
end
Example
# Send to a different address with each message
session.producer do |producer|
msg = session.
msg.type = :text
producer.send('Another address', msg)
end
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 261 def producer(params=nil, &block) address = nil rate = nil if params.kind_of?(Hash) address = params[:address] rate = params[:rate] else address = params end producer = nil begin producer = if rate self.create_producer(address, rate) elsif address self.create_producer(address) else self.create_producer end block.call(producer) ensure producer.close if producer end end |
#requestor(request_address, reply_address = nil, reply_queue = nil, &block) ⇒ Object
Creates a RequestorPattern to send a request and to synchronously wait for the reply, call the supplied block, then close the requestor Returns the result from the block
294 295 296 297 298 299 300 301 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 294 def requestor(request_address, reply_address=nil, reply_queue=nil, &block) begin requestor = self.create_requestor(request_address, reply_address, reply_queue) block.call(requestor) ensure requestor.close if requestor end end |
#server(input_queue, timeout = 0, &block) ⇒ Object
Creates a ServerPattern to send messages to consume messages and send replies, call the supplied block, then close the server Returns the result from the block
312 313 314 315 316 317 318 319 |
# File 'lib/hornetq/client/org_hornetq_api_core_client_client_session.rb', line 312 def server(input_queue, timeout=0, &block) begin server = self.create_server(input_queue, timeout) block.call(server) ensure server.close if server end end |