Class: JMS::SessionPool
- Inherits:
-
Object
- Object
- JMS::SessionPool
- Defined in:
- lib/jms/session_pool.rb
Overview
Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.
Each thread can request a session and then return it once it is no longer needed by that thread. The only way to get a session is pass a block so that the Session is automatically returned to the pool upon completion of the block.
Parameters:
see regular session parameters from: JMS::Connection#initialize
Additional parameters for controlling the session pool itself
:pool_size Maximum Pool Size. Default: 10
The pool only grows as needed and will never exceed
:pool_size
:pool_timeout Number of seconds to wait before raising a TimeoutError
if no sessions are available in the poo
Default: 60
:pool_warn_timeout Number of seconds to wait before logging a warning when a
session in the pool is not available
Default: 5
:pool_name Name of the pool as it shows up in the logger.
Default: 'JMS::SessionPool'
Example:
session_pool = connection.create_session_pool(config)
session_pool.session do |session|
....
end
Instance Method Summary collapse
-
#close ⇒ Object
Immediately Close all sessions in the pool and release from the pool.
-
#consumer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageConsumer.
-
#initialize(connection, params = {}) ⇒ SessionPool
constructor
A new instance of SessionPool.
-
#producer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageProducer.
-
#session(&block) ⇒ Object
Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes.
Constructor Details
#initialize(connection, params = {}) ⇒ SessionPool
Returns a new instance of SessionPool.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/jms/session_pool.rb', line 34 def initialize(connection, params={}) # Save Session params since it will be used every time a new session is # created in the pool session_params = params.nil? ? {} : params.dup logger = SemanticLogger[session_params[:pool_name] || self.class] # Use GenePool can create and manage the pool of sessions @pool = GenePool.new( name: '', pool_size: session_params[:pool_size] || 10, warn_timeout: session_params[:pool_warn_timeout] || 5, timeout: session_params[:pool_timeout] || 60, close_proc: nil, logger: logger ) do session = connection.create_session(session_params) # Turn on Java class persistence: https://github.com/jruby/jruby/wiki/Persistence session.class.__persistent__ = true session end # Handle connection failures connection.on_exception do |jms_exception| logger.error "JMS Connection Exception has occurred: #{jms_exception.inspect}" end end |
Instance Method Details
#close ⇒ Object
Immediately Close all sessions in the pool and release from the pool
Note: This is an immediate close, active sessions will be aborted
Note: Once closed a session pool cannot be re-used. A new instance must
be created
156 157 158 |
# File 'lib/jms/session_pool.rb', line 156 def close @pool.each { |s| s.close } end |
#consumer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageConsumer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
topic_name: [String] Name of the Topic to write to or subscribe to
[Symbol] Create temporary topic
Mandatory unless :queue_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
selector: Filter which messages should be returned from the queue
Default: All messages
no_local: Determine whether messages published by its own connection
should be delivered to it
Default: false
Example
session_pool.consumer(queue_name: 'MyQueue') do |session, consumer|
= consumer.receive(timeout)
puts .data if
end
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/jms/session_pool.rb', line 108 def consumer(params, &block) session do |s| begin consumer = s.consumer(params) block.call(s, consumer) ensure consumer.close if consumer end end end |
#producer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageProducer. Pass both into the supplied block. Once the block is complete the producer is closed and the session is returned to the pool.
Parameters:
queue_name: [String] Name of the Queue to return
[Symbol] Create temporary queue
Mandatory unless :topic_name is supplied
Or,
topic_name: [String] Name of the Topic to write to or subscribe to
[Symbol] Create temporary topic
Mandatory unless :queue_name is supplied
Or,
destination: [javaxJms::Destination] Destination to use
Example
session_pool.producer(queue_name: 'ExampleQueue') do |session, producer|
producer.send(session.("Hello World"))
end
139 140 141 142 143 144 145 146 147 148 |
# File 'lib/jms/session_pool.rb', line 139 def producer(params, &block) session do |s| begin producer = s.producer(params) block.call(s, producer) ensure producer.close if producer end end end |
#session(&block) ⇒ Object
Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes
In the event a JMS Exception is thrown the session will be closed and removed from the pool to prevent re-using sessions that are no longer valid
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/jms/session_pool.rb', line 66 def session(&block) s = nil begin s = @pool.checkout block.call(s) rescue javax.jms.JMSException => e s.close rescue nil @pool.remove(s) s = nil # Do not check back in since we have removed it raise e ensure @pool.checkin(s) if s end end |