Class: HornetQ::Client::SessionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/hornetq/client/session_pool.rb

Overview

Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.

Each thread can requests a session and then return it once it is no longer needed by that thread. Rather than requesting a session directly using SessionPool::session, it is recommended to pass a block so that the Session is automatically returned to the pool. For example:

session_pool.session do |session|
  .. do something with the session here
end

Parameters:

see regular session parameters from: HornetQ::Client::Connection::create_session

Additional parameters for controlling the session pool itself

:pool_name         Name of the pool as it shows up in the logger.
                   Default: 'HornetQ::Client::SessionPool'
:pool_size         Maximum Pool Size. Default: 10
                   The pool only grows as needed and will never exceed
                   :pool_size
:pool_warn_timeout Amount of time to wait before logging a warning when a
                   session in the pool is not available. Measured in seconds
                   Default: 5.0
:pool_logger       Supply a logger that responds to #debug, #info, #warn and #debug?
                   For example: Rails.logger
                   Default: None.

Example:

session_pool = connection.create_session_pool(config)
session_pool.session do |session|
   ....
end

Instance Method Summary collapse

Constructor Details

#initialize(connection, params = {}) ⇒ SessionPool

Returns a new instance of SessionPool.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/hornetq/client/session_pool.rb', line 38

def initialize(connection, params={})
  # Save Session params since it will be used every time a new session is
  # created in the pool
  session_params = params.nil? ? {} : params.dup
  # TODO Use same logger as HornetQ?
  # TODO How to shrink unused connections?
  @pool = GenePool.new(
    :name => session_params[:pool_name] || self.class.name,
    :pool_size => session_params[:pool_size] || 10,
    :warn_timeout => session_params[:pool_warn_timeout] || 5,
    :logger       => session_params[:pool_logger]) do
    s = connection.create_session(session_params)
    # Start the session since it will be used immediately upon creation
    s.start
    s
  end

  # Obtain a session from the pool and pass it to the supplied block
  # The session is automatically returned to the pool once the block completes
  def session(&block)
    @pool.with_connection &block
  end

  # Obtain a session from the pool and create a ClientConsumer. 
  # Pass both into the supplied block. 
  # Once the block is complete the consumer is closed and the session is
  # returned to the pool.
  # 
  # See  HornetQ::Client::ClientConsumer for more information on the consumer
  #      parameters
  #
  # Example
  #   session_pool.consumer('MyQueue') do |session, consumer|
  #     message = consumer.receive(timeout)
  #     puts message.body if message
  #   end
  def consumer(queue_name, &block)
    session do |s|
      consumer = nil
      begin
        consumer = s.create_consumer(queue_name)
        block.call(s, consumer)
      ensure
        consumer.close
      end
    end
  end
  
  # Obtain a session from the pool and create a ClientProducer. 
  # Pass both into the supplied block. 
  # Once the block is complete the consumer is closed and the session is
  # returned to the pool.
  # 
  # See  HornetQ::Client::ClientProducer for more information on the producer
  #      parameters
  #
  # Example
  #   session_pool.producer('MyAddress') do |session, producer|
  #     message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,false)
  #     message.body = "#{Time.now}: ### Hello, World ###"
  #     producer.send(message)
  #   end
  def producer(address, &block)
    session do |s|
      producer = nil
      begin
        producer = s.create_producer(address)
        block.call(s, producer)
      ensure
        producer.close if producer
      end
    end
  end
  
  # Obtain a session from the pool and create a Client::Requestor.
  # Pass both into the supplied block. 
  # Once the block is complete the requestor is closed and the session is
  # returned to the pool.
  # 
  # See  HornetQ::Client::Requestor for more information on the requestor
  #
  # Example
  #   session_pool.requestor(params) do |session, requestor|
  #     ....
  #   end
  def requestor(address, &block)
    session do |s|
      requestor = nil
      begin
        requestor = s.create_requestor(address)
        block.call(s, requestor)
      ensure
        requestor.close
      end
    end
  end
  
  # Obtain a session from the pool and create a Client::Server.
  # Pass both into the supplied block. 
  # Once the block is complete the requestor is closed and the session is
  # returned to the pool.
  # 
  # See  HornetQ::Client::Server for more information on the server
  #
  # Example
  #   session_pool.server(queue, timeout) do |session, server|
  #     ....
  #   end
  def server(queue, timeout=0, &block)
    session do |s|
      server = nil
      begin
        server = s.create_server(queue, timeout=0)
        block.call(s, server)
      ensure
        server.close
      end
    end
  end
  
  # Immediately Close all sessions in the pool and release from the pool
  # 
  # TODO: Allow an option to wait for active sessions to be returned before 
  #       closing
  def close
    @pool.each do |s|
      #@pool.remove(s)
      s.close
      #@pool.remove(s)
    end
  end
  
end

Instance Method Details

#closeObject

Immediately Close all sessions in the pool and release from the pool

TODO: Allow an option to wait for active sessions to be returned before

closing


162
163
164
165
166
167
168
# File 'lib/hornetq/client/session_pool.rb', line 162

def close
  @pool.each do |s|
    #@pool.remove(s)
    s.close
    #@pool.remove(s)
  end
end

#consumer(queue_name, &block) ⇒ Object

Obtain a session from the pool and create a ClientConsumer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.

See HornetQ::Client::ClientConsumer for more information on the consumer

parameters

Example

session_pool.consumer('MyQueue') do |session, consumer|
  message = consumer.receive(timeout)
  puts message.body if message
end


74
75
76
77
78
79
80
81
82
83
84
# File 'lib/hornetq/client/session_pool.rb', line 74

def consumer(queue_name, &block)
  session do |s|
    consumer = nil
    begin
      consumer = s.create_consumer(queue_name)
      block.call(s, consumer)
    ensure
      consumer.close
    end
  end
end

#producer(address, &block) ⇒ Object

Obtain a session from the pool and create a ClientProducer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.

See HornetQ::Client::ClientProducer for more information on the producer

parameters

Example

session_pool.producer('MyAddress') do |session, producer|
  message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,false)
  message.body = "#{Time.now}: ### Hello, World ###"
  producer.send(message)
end


100
101
102
103
104
105
106
107
108
109
110
# File 'lib/hornetq/client/session_pool.rb', line 100

def producer(address, &block)
  session do |s|
    producer = nil
    begin
      producer = s.create_producer(address)
      block.call(s, producer)
    ensure
      producer.close if producer
    end
  end
end

#requestor(address, &block) ⇒ Object

Obtain a session from the pool and create a Client::Requestor. Pass both into the supplied block. Once the block is complete the requestor is closed and the session is returned to the pool.

See HornetQ::Client::Requestor for more information on the requestor

Example

session_pool.requestor(params) do |session, requestor|
  ....
end


123
124
125
126
127
128
129
130
131
132
133
# File 'lib/hornetq/client/session_pool.rb', line 123

def requestor(address, &block)
  session do |s|
    requestor = nil
    begin
      requestor = s.create_requestor(address)
      block.call(s, requestor)
    ensure
      requestor.close
    end
  end
end

#server(queue, timeout = 0, &block) ⇒ Object

Obtain a session from the pool and create a Client::Server. Pass both into the supplied block. Once the block is complete the requestor is closed and the session is returned to the pool.

See HornetQ::Client::Server for more information on the server

Example

session_pool.server(queue, timeout) do |session, server|
  ....
end


146
147
148
149
150
151
152
153
154
155
156
# File 'lib/hornetq/client/session_pool.rb', line 146

def server(queue, timeout=0, &block)
  session do |s|
    server = nil
    begin
      server = s.create_server(queue, timeout=0)
      block.call(s, server)
    ensure
      server.close
    end
  end
end

#session(&block) ⇒ Object

Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes



57
58
59
# File 'lib/hornetq/client/session_pool.rb', line 57

def session(&block)
  @pool.with_connection &block
end