Module: JMS::Session
- Defined in:
- lib/jms/session.rb
Overview
For each thread that will be processing messages concurrently a separate session is required. All sessions can share a single connection to the same JMS Provider.
Interface javax.jms.Session
See: download.oracle.com/javaee/6/api/javax/jms/Session.html
Other methods still directly accessible through this class:
create_browser(queue, message_selector)
Creates a QueueBrowser object to peek at the messages on the specified queue using a message selector.
create_bytes_message()
Creates a BytesMessage object
create_consumer(destination)
Creates a MessageConsumer for the specified destination
See: Connection::consumer
Example:
destination = session.create_destination(queue_name: "MyQueue")
session.create_consumer(destination)
create_consumer(destination, message_selector)
Creates a MessageConsumer for the specified destination, using a message selector
create_consumer(destination, message_selector, boolean NoLocal)
Creates MessageConsumer for the specified destination, using a message selector
create_durable_subscriber(Topic topic, java.lang.String name)
Creates a durable subscriber to the specified topic
create_durable_subscriber(Topic topic, java.lang.String name, java.lang.String messageSelector, boolean noLocal)
Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages published by its own connection should be delivered to it.
create_map_Message()
Creates a MapMessage object
create_message()
Creates a Message object
create_object_message()
Creates an ObjectMessage object
create_object_message(java.io.Serializable object)
Creates an initialized ObjectMessage object
create_producer(destination)
Creates a MessageProducer to send to the specified destination
create_queue(queue_name)
Creates a queue identity given a Queue name
create_stream_message()
Creates a StreamMessage object
create_temporary_queue()
Creates a TemporaryQueue object
create_temporary_topic()
Creates a TemporaryTopic object
create_text_message()
Creates a TextMessage object
create_text_message(text)
Creates an initialized TextMessage object
create_topic(topic_name)
Creates a topic identity given a Topic name
acknowledge_mode()
Returns the acknowledgement mode of the session
message_listener()
Returns the session's distinguished message listener (optional).
transacted?
Indicates whether the session is in transacted mode
recover()
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message
rollback()
Rolls back any messages done in this transaction and releases any locks currently held
message_listener=(MessageListener listener)
Sets the session's distinguished message listener (optional)
unsubscribe(name)
Unsubscribes a durable subscription that has been created by a client
Interface javax.jms.Session
Instance Method Summary collapse
-
#browse(params = {}, &block) ⇒ Object
Browse the specified queue, calling the Proc supplied for each message found.
-
#browser(params, &block) ⇒ Object
Return a browser for the destination A browser can read messages non-destructively from the queue It cannot browse Topics!.
-
#consume(params, &block) ⇒ Object
Consume all messages for the destination A consumer can read messages from the queue or topic.
-
#consumer(params, &block) ⇒ Object
Return a consumer for the destination A consumer can read messages from the queue or topic.
-
#create_destination(params) ⇒ Object
Create the destination based on the parameter supplied.
-
#destination(params = {}, &block) ⇒ Object
Create a queue or topic to send or receive messages from.
-
#message(data, type = nil) ⇒ Object
Create a new message instance based on the type of the data being supplied String (:to_str) => TextMessage Hash (:each_pair) => MapMessage Duck typing is used to determine the type.
-
#producer(params, &block) ⇒ Object
Return a producer for the queue name supplied A producer supports sending messages to a Queue or a Topic.
-
#queue(queue_name, &block) ⇒ Object
Return the queue matching the queue name supplied Call the Proc if supplied.
-
#temporary_queue(&block) ⇒ Object
Return a temporary queue The temporary queue is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed.
-
#temporary_topic(&block) ⇒ Object
Return a temporary topic The temporary topic is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed.
-
#topic(topic_name, &block) ⇒ Object
Return the topic matching the topic name supplied Call the Proc if supplied.
Instance Method Details
#browse(params = {}, &block) ⇒ Object
Browse the specified queue, calling the Proc supplied for each message found
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
selector: Filter which messages should be returned from the queue
Default: All messages
456 457 458 |
# File 'lib/jms/session.rb', line 456 def browse(params={}, &block) self.browser(params) { |b| b.each(params, &block) } end |
#browser(params, &block) ⇒ Object
Return a browser for the destination A browser can read messages non-destructively from the queue It cannot browse Topics!
Call the Proc if supplied, then automatically close the consumer
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
selector: Filter which messages should be returned from the queue
Default: All messages
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/jms/session.rb', line 423 def browser(params, &block) raise(ArgumentError, 'Session::browser requires a code block to be executed') unless block destination = create_destination(params) b = nil if params[:selector] b = create_browser(destination, params[:selector]) else b = create_browser(destination) end if block begin block.call(b) ensure b.close b = nil end end b end |
#consume(params, &block) ⇒ Object
Consume all messages for the destination A consumer can read messages from the queue or topic
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
topic_name: [String] Name of the Topic to write to or subscribe to
[Symbol] Create temporary topic
Mandatory unless :queue_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
selector: Filter which messages should be returned from the queue
Default: All messages
no_local: Determine whether messages published by its own connection
should be delivered to it
Default: false
:timeout Follows the rules for MQSeries:
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
399 400 401 402 403 404 405 406 |
# File 'lib/jms/session.rb', line 399 def consume(params, &block) begin c = self.consumer(params) c.each(params, &block) ensure c.close if c end end |
#consumer(params, &block) ⇒ Object
Return a consumer for the destination A consumer can read messages from the queue or topic
Call the block if supplied, then automatically close the consumer
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
topic_name: [String] Name of the Topic to write to or subscribe to
[Symbol] Create temporary topic
Mandatory unless :queue_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
selector: Filter which messages should be returned from the queue
Default: All messages
no_local: Determine whether messages published by its own connection
should be delivered to it
Default: false
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/jms/session.rb', line 349 def consumer(params, &block) destination = create_destination(params) c = if params[:no_local] create_consumer(destination, params[:selector] || '', params[:no_local]) elsif params[:selector] create_consumer(destination, params[:selector]) else create_consumer(destination) end if block begin block.call(c) ensure c.close c = nil end end c end |
#create_destination(params) ⇒ Object
Create the destination based on the parameter supplied
The idea behind this method is to allow the decision as to whether one is sending to a topic or destination to be transparent to the code. The supplied parameters can be externalized into say a YAML file so that today it writes to a queue, later it can be changed to write to a topic so that multiple parties can receive the same messages.
Note: For Temporary Queues and Topics, remember to delete them when done
or just use ::destination instead with a block and it will take care
of deleting them for you
To create a queue:
session.create_destination(queue_name: 'name of queue')
To create a temporary queue:
session.create_destination(queue_name: :temporary)
To create a queue:
session.create_destination('queue://queue_name')
To create a topic:
session.create_destination(topic_name: 'name of queue')
To create a temporary topic:
session.create_destination(topic_name: :temporary)
To create a topic:
session.create_destination('topic://topic_name')
Create the destination based on the parameter supplied
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
topic_name: [String] Name of the Topic to write to or subscribe to
[Symbol] Create temporary topic
Mandatory unless :queue_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
Returns the result of the supplied block
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/jms/session.rb', line 178 def create_destination(params) # Allow a Java JMS destination object to be passed in return params[:destination] if params[:destination] && params[:destination].java_kind_of?(JMS::Destination) queue_name = nil topic_name = nil if params.is_a? String queue_name = params['queue://'.length..-1] if params.start_with?('queue://') topic_name = params['topic://'.length..-1] if params.start_with?('topic://') else # :q_name is deprecated queue_name = params[:queue_name] || params[:q_name] topic_name = params[:topic_name] end unless queue_name || topic_name raise(ArgumentError, 'Missing mandatory parameter :queue_name or :topic_name to Session::producer, Session::consumer, or Session::browser') end if queue_name queue_name == :temporary ? create_temporary_queue : create_queue(queue_name) else topic_name == :temporary ? create_temporary_topic : create_topic(topic_name) end end |
#destination(params = {}, &block) ⇒ Object
Create a queue or topic to send or receive messages from
A block must be supplied so that if it is a temporary topic or queue it will be deleted after the block is complete
To create a queue:
session.destination(queue_name: 'name of queue')
To create a temporary queue:
session.destination(queue_name: :temporary)
To create a topic:
session.destination(topic_name: 'name of queue')
To create a temporary topic:
session.destination(topic_name: :temporary)
Create the destination based on the parameter supplied
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
topic_name: [String] Name of the Topic to write to or subscribe to
[Symbol] Create temporary topic
Mandatory unless :queue_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
Returns the result of the supplied block
236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/jms/session.rb', line 236 def destination(params={}, &block) raise(ArgumentError, 'Missing mandatory Block when calling JMS::Session#destination') unless block dest = nil begin dest = create_destination(params) block.call(dest) ensure # Delete Temporary Queue / Topic dest.delete if dest && dest.respond_to?(:delete) end end |
#message(data, type = nil) ⇒ Object
Create a new message instance based on the type of the data being supplied
String (:to_str) => TextMessage
Hash (:each_pair) => MapMessage
Duck typing is used to determine the type. If the class responds to :to_str then it is considered a String. Similarly if it responds to :each_pair it is considered to be a Hash
If automated duck typing is not desired, the type of the message can be specified by setting the parameter ‘type’ to any one of:
text: Creates a Text Message
map: Creates a Map Message
bytes: Creates a Bytes Message
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/jms/session.rb', line 108 def (data, type=nil) = nil type ||= if data.respond_to?(:to_str, false) :text elsif data.respond_to?(:each_pair, false) :map else raise "Unknown data type #{data.class.to_s} in Message" end case type when :text = self.createTextMessage .text = data.to_str when :map = self.createMapMessage .data = data when :bytes = self.createBytesMessage .write_bytes(data.to_java_bytes) else raise "Invalid type #{type} requested" end end |
#producer(params, &block) ⇒ Object
Return a producer for the queue name supplied A producer supports sending messages to a Queue or a Topic
Call the Proc if supplied, then automatically close the producer
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
topic_name: [String] Name of the Topic to write to or subscribe to
[Symbol] Create temporary topic
Mandatory unless :queue_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/jms/session.rb', line 315 def producer(params, &block) p = self.create_producer(self.create_destination(params)) if block begin block.call(p) ensure p.close p = nil end end p end |
#queue(queue_name, &block) ⇒ Object
Return the queue matching the queue name supplied Call the Proc if supplied
250 251 252 253 254 |
# File 'lib/jms/session.rb', line 250 def queue(queue_name, &block) q = create_queue(queue_name) block.call(q) if block q end |
#temporary_queue(&block) ⇒ Object
Return a temporary queue The temporary queue is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed
260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/jms/session.rb', line 260 def temporary_queue(&block) q = create_temporary_queue if block begin block.call(q) ensure # Delete Temporary queue on completion of block q.delete if q q = nil end end q end |
#temporary_topic(&block) ⇒ Object
Return a temporary topic The temporary topic is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed
286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/jms/session.rb', line 286 def temporary_topic(&block) t = create_temporary_topic if block begin block.call(t) ensure # Delete Temporary topic on completion of block t.delete if t t = nil end end t end |
#topic(topic_name, &block) ⇒ Object
Return the topic matching the topic name supplied Call the Proc if supplied
276 277 278 279 280 |
# File 'lib/jms/session.rb', line 276 def topic(topic_name, &block) t = create_topic(topic_name) block.call(t) if block t end |