Class: HornetQ::Client::SessionPool
- Inherits:
-
Object
- Object
- HornetQ::Client::SessionPool
- Defined in:
- lib/hornetq/client/session_pool.rb
Overview
Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.
Each thread can requests a session and then return it once it is no longer needed by that thread. Rather than requesting a session directly using SessionPool::session, it is recommended to pass a block so that the Session is automatically returned to the pool. For example:
session_pool.session do |session|
.. do something with the session here
end
Parameters:
see regular session parameters from: HornetQ::Client::Connection::create_session
Additional parameters for controlling the session pool itself
:pool_name Name of the pool as it shows up in the logger.
Default: 'HornetQ::Client::SessionPool'
:pool_size Maximum Pool Size. Default: 10
The pool only grows as needed and will never exceed
:pool_size
:pool_warn_timeout Amount of time to wait before logging a warning when a
session in the pool is not available. Measured in seconds
Default: 5.0
:pool_logger Supply a logger that responds to #debug, #info, #warn and #debug?
For example: Rails.logger
Default: None.
Example:
session_pool = connection.create_session_pool(config)
session_pool.session do |session|
....
end
Instance Method Summary collapse
-
#close ⇒ Object
Immediately Close all sessions in the pool and release from the pool.
-
#consumer(queue_name, &block) ⇒ Object
Obtain a session from the pool and create a ClientConsumer.
-
#initialize(connection, params = {}) ⇒ SessionPool
constructor
A new instance of SessionPool.
-
#producer(address, &block) ⇒ Object
Obtain a session from the pool and create a ClientProducer.
-
#requestor(address, &block) ⇒ Object
Obtain a session from the pool and create a Client::Requestor.
-
#server(queue, timeout = 0, &block) ⇒ Object
Obtain a session from the pool and create a Client::Server.
-
#session(&block) ⇒ Object
Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes.
Constructor Details
#initialize(connection, params = {}) ⇒ SessionPool
Returns a new instance of SessionPool.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/hornetq/client/session_pool.rb', line 38 def initialize(connection, params={}) # Save Session params since it will be used every time a new session is # created in the pool session_params = params.nil? ? {} : params.dup # TODO Use same logger as HornetQ? # TODO How to shrink unused connections? @pool = GenePool.new( :name => session_params[:pool_name] || self.class.name, :pool_size => session_params[:pool_size] || 10, :warn_timeout => session_params[:pool_warn_timeout] || 5, :logger => session_params[:pool_logger]) do s = connection.create_session(session_params) # Start the session since it will be used immediately upon creation s.start s end # Obtain a session from the pool and pass it to the supplied block # The session is automatically returned to the pool once the block completes def session(&block) @pool.with_connection &block end # Obtain a session from the pool and create a ClientConsumer. # Pass both into the supplied block. # Once the block is complete the consumer is closed and the session is # returned to the pool. # # See HornetQ::Client::ClientConsumer for more information on the consumer # parameters # # Example # session_pool.consumer('MyQueue') do |session, consumer| # message = consumer.receive(timeout) # puts message.body if message # end def consumer(queue_name, &block) session do |s| consumer = nil begin consumer = s.create_consumer(queue_name) block.call(s, consumer) ensure consumer.close end end end # Obtain a session from the pool and create a ClientProducer. # Pass both into the supplied block. # Once the block is complete the consumer is closed and the session is # returned to the pool. # # See HornetQ::Client::ClientProducer for more information on the producer # parameters # # Example # session_pool.producer('MyAddress') do |session, producer| # message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,false) # message.body = "#{Time.now}: ### Hello, World ###" # producer.send(message) # end def producer(address, &block) session do |s| producer = nil begin producer = s.create_producer(address) block.call(s, producer) ensure producer.close if producer end end end # Obtain a session from the pool and create a Client::Requestor. # Pass both into the supplied block. # Once the block is complete the requestor is closed and the session is # returned to the pool. # # See HornetQ::Client::Requestor for more information on the requestor # # Example # session_pool.requestor(params) do |session, requestor| # .... # end def requestor(address, &block) session do |s| requestor = nil begin requestor = s.create_requestor(address) block.call(s, requestor) ensure requestor.close end end end # Obtain a session from the pool and create a Client::Server. # Pass both into the supplied block. # Once the block is complete the requestor is closed and the session is # returned to the pool. # # See HornetQ::Client::Server for more information on the server # # Example # session_pool.server(queue, timeout) do |session, server| # .... # end def server(queue, timeout=0, &block) session do |s| server = nil begin server = s.create_server(queue, timeout=0) block.call(s, server) ensure server.close end end end # Immediately Close all sessions in the pool and release from the pool # # TODO: Allow an option to wait for active sessions to be returned before # closing def close @pool.each do |s| #@pool.remove(s) s.close #@pool.remove(s) end end end |
Instance Method Details
#close ⇒ Object
Immediately Close all sessions in the pool and release from the pool
TODO: Allow an option to wait for active sessions to be returned before
closing
162 163 164 165 166 167 168 |
# File 'lib/hornetq/client/session_pool.rb', line 162 def close @pool.each do |s| #@pool.remove(s) s.close #@pool.remove(s) end end |
#consumer(queue_name, &block) ⇒ Object
Obtain a session from the pool and create a ClientConsumer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.
See HornetQ::Client::ClientConsumer for more information on the consumer
parameters
Example
session_pool.consumer('MyQueue') do |session, consumer|
= consumer.receive(timeout)
puts .body if
end
74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/hornetq/client/session_pool.rb', line 74 def consumer(queue_name, &block) session do |s| consumer = nil begin consumer = s.create_consumer(queue_name) block.call(s, consumer) ensure consumer.close end end end |
#producer(address, &block) ⇒ Object
Obtain a session from the pool and create a ClientProducer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.
See HornetQ::Client::ClientProducer for more information on the producer
parameters
Example
session_pool.producer('MyAddress') do |session, producer|
= session.(HornetQ::Client::Message::TEXT_TYPE,false)
.body = "#{Time.now}: ### Hello, World ###"
producer.send()
end
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/hornetq/client/session_pool.rb', line 100 def producer(address, &block) session do |s| producer = nil begin producer = s.create_producer(address) block.call(s, producer) ensure producer.close if producer end end end |
#requestor(address, &block) ⇒ Object
Obtain a session from the pool and create a Client::Requestor. Pass both into the supplied block. Once the block is complete the requestor is closed and the session is returned to the pool.
See HornetQ::Client::Requestor for more information on the requestor
Example
session_pool.requestor(params) do |session, requestor|
....
end
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/hornetq/client/session_pool.rb', line 123 def requestor(address, &block) session do |s| requestor = nil begin requestor = s.create_requestor(address) block.call(s, requestor) ensure requestor.close end end end |
#server(queue, timeout = 0, &block) ⇒ Object
Obtain a session from the pool and create a Client::Server. Pass both into the supplied block. Once the block is complete the requestor is closed and the session is returned to the pool.
See HornetQ::Client::Server for more information on the server
Example
session_pool.server(queue, timeout) do |session, server|
....
end
146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/hornetq/client/session_pool.rb', line 146 def server(queue, timeout=0, &block) session do |s| server = nil begin server = s.create_server(queue, timeout=0) block.call(s, server) ensure server.close end end end |
#session(&block) ⇒ Object
Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes
57 58 59 |
# File 'lib/hornetq/client/session_pool.rb', line 57 def session(&block) @pool.with_connection &block end |