Class: HornetQ::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/hornetq/client/connection.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Connection

Create a new Connection from which sessions can be created

Mandatory Parameters

  • :uri => ‘hornetq://localhost’,

    • The hornetq uri as to which server to connect with and which transport protocol to use. Format:

      hornetq://server:port,backupserver:port/?protocol=[netty|discover]
      
    • To use the default netty transport

      hornetq://server:port
      
    • To use the default netty transport and specify a backup server

      hornetq://server:port,backupserver:port
      
    • To use auto-discovery

      hornetq://server:port/?protocol=discovery
      
    • To use HornetQ within the current JVM

      hornetq://invm
      

Optional Parameters

High Availability
  • :ha => true | false,

    true:  Receives cluster topology updates from the cluster as
           servers leave or join and new backups are appointed or removed.
    false: Uses the suplied static list of hosts in :uri
           and no HA backup information is propagated to the client
    Default: false
    
 Flow Control

  :ack_batch_size                 => integer,
    Sets the acknowledgements batch size. Must be > 0

  :pre_acknowledge                => true | false,
    Sets whether messages will pre-acknowledged on the server before 
    they are sent to the consumers or not
      true : Pre-acknowledge consumed messages on the server before they are sent to consumers
      false: Clients acknowledge the message they consume. 
    Default: false

 Grouping:

  :auto_group                     => true | false,
    Sets whether producers will automatically assign a group ID 
    to sent messages
      true: A random unique group ID is created and set on each message 
            for the property Message.HDR_GROUP_ID
    Default: false

  :group_id                       => string,
    Sets the group ID that will be set on each message sent
    Default: nil (no goup id will be set)

 Blocking calls:

  :block_on_acknowledge           => true | false,
    Sets whether consumers created through this factory will block 
    while sending message acknowledgements or do it asynchronously. 
    Default: false

  :block_on_durable_send          => true | false,
    Sets whether producers will block while sending durable messages 
    or do it asynchronously.
    If the session is configured to send durable message asynchronously, 
    the client can set a SendAcknowledgementHandler on the ClientSession 
    to be notified once the message has been handled by the server. 
    Default: true

  :block_on_non_durable_send      => true | false,
    Sets whether producers will block while sending non-durable messages 
    or do it asynchronously.
    If the session is configured to send non-durable message asynchronously, 
    the client can set a SendAcknowledgementHandler on the ClientSession 
    to be notified once the message has been handled by the server. 
    Default: false

  :call_timeout                   => long,
    Sets the blocking calls timeout in milliseconds. If client's blocking calls to the 
    server take more than this timeout, the call will throw a 
    HornetQException with the code HornetQException.CONNECTION_TIMEDOUT. 
    Value is in milliseconds, default value is HornetQClient.DEFAULT_CALL_TIMEOUT. 
    Must be >= 0

 Client Reconnection Parameters:

  :connection_ttl                 => long,
    Set the connection time-to-live 
    -1  : Disable
    >=0 : milliseconds the server will keep a connection alive in the 
          absence of any data arriving from the client. 
    Default: 60,000

  :client_failure_check_period    => long,
    Sets the period in milliseconds used to check if a client has 
    failed to receive pings from the server. 
    Value must be -1 (to disable) or greater than 0
    Default: 30,000

  :initial_connect_attempts       => int,
    ?

  :failover_on_initial_connection => true | false,
    Sets whether the client will automatically attempt to connect to 
    the backup server if the initial connection to the live server fails
      true : If live server is not reachable try to connect to backup server
      false: Fail to start if live server is not reachable
    Default: false

  :max_retry_interval             => long,
    Sets the maximum retry interval in milliseconds.
    Only appicable if the retry interval multiplier has been specified
    Default: 2000 (2 seconds)

  :reconnect_attempts             => 1,
  :retry_interval                 => long,
    Returns the time to retry the connection after failure. 
    Value is in milliseconds.
    Default: 2000 (2 seconds)

  :retry_interval_multiplier      => double,
    Sets the multiplier to apply to successive retry intervals. 
    Value must be positive. 
    Default: 1

 Large Message parameters:

  :cache_large_messages_client    => true | false,
    Sets whether large messages received by consumers will be  
    cached in temporary files or not. 
    When true, consumers will create temporary files to cache large messages. 
    There is 1 temporary file created for each large message. 
    Default: false

  :min_large_message_size         => int,
    Sets the large message size threshold in bytes. Value must be > 0
    Messages whose size is if greater than this value will be handled as large messages
    Default: 102400 bytes  (100 KBytes)

  :compress_large_message         => true | false,

 Message Rate Management:

  :consumer_max_rate              => int,
    Sets the maximum rate of message consumption for consumers.
    Controls the rate at which a consumer can consume messages.
    A consumer will never consume messages at a rate faster than the
    rate specified. 
      -1 : Disable
     >=0 : Maximum desired message consumption rate specified 
           in units of messages per second. 
    Default: -1

  :producer_max_rate              => int,
    Sets the maximum rate of message production for producers. 
    Controls the rate at which a producer can produce messages.
    A producer will never produce messages at a rate faster than the rate specified.
      -1 : Disabled
      >0 : Maximum desired message production rate specified in units of messages per second.
    Default: -1 (Disabled)

 Thread Pools:

  :scheduled_thread_pool_max_size => int,
    Sets the maximum size of the scheduled thread pool. 
    This setting is relevant only if this factory does not use global pools. 
    Value must be greater than 0. 
    Default: 5

  :thread_pool_max_size           => int,
    Sets the maximum size of the thread pool. 
    This setting is relevant only if this factory does not use 
    global pools. 
      -1 : Unlimited thread pool
      >0 : Number of threads in pool
    Default: -1 (Unlimited)

  :use_global_pools               => true | false,
    Sets whether this factory will use global thread pools 
    (shared among all the factories in the same JVM) or its own pools.
      true: Uses global JVM thread pools across all HornetQ connections
      false: Use a thread pool just for this connection
    Default: true

 Window Sizes:

  :confirmation_window_size       => int,
    Set the size in bytes for the confirmation window of this connection. 
      -1 : Disable the window
      >0 : Size in bytes
    Default: -1 (Disabled)

  :consumer_window_size           => int,
    Sets the window size for flow control for consumers.
     -1 : Disable flow control
      0 : Do Not buffer any messages
     >0 : Set the maximum size of the buffer
    Default: 1048576 (1 MB)

  :producer_window_size           => int,
    Sets the window size for flow control of the producers. 
    -1 : Disable flow control
    >0 : The maximum amount of bytes at any give time (to prevent overloading the connection). 
    Default: 65536 (64 KBytes)

Other:

  :connection_load_balancing_policy_class_name => string,
    Set the class name of the connection load balancing policy
    Value must be the name of a class implementing org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy
    Default: "org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy"

  :initial_message_packet_size    => int,
    Sets the initial size of messages in bytes
    Value must be greater than 0


277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/hornetq/client/connection.rb', line 277

def initialize(params={})
  params =params.clone
  uri = nil
  # TODO: Support :uri as an array for cluster configurations
  if params.kind_of?(String)
    uri = HornetQ::URI.new(params)
    params = uri.params
  else
    raise "Missing :uri param in HornetQ::Server.create_server" unless params[:uri]
    uri = HornetQ::URI.new(params.delete(:uri))
    # params override uri params
    params = uri.params.merge(params)
  end

  # In-VM Transport has no fail-over or additional parameters
  @is_invm = uri.host == 'invm'
  transport_list = []
  if @is_invm
    transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME)
  else
    case params[:protocol]
    when 'discovery'
      #TODO: Also support: DiscoveryGroupConfiguration(String name, String localBindAddress, String groupAddress, int groupPort, long refreshTimeout, long discoveryInitialWaitTimeout)
      transport_list << Java::org.hornetq.api.core::DiscoveryGroupConfiguration.new(uri.host, uri.port)
    when 'netty', nil
      transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port })

      if uri.backup_host
        transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port })
      end
    else
      raise "Unknown HornetQ protocol:'#{params[:protocol]}'"
    end
  end

  #TODO: Support: server_locator.addInterceptor
    
  # Create server locator with or without HA. Without HA being the default
  @server_locator = if params[:ha]
    Java::org.hornetq.api.core.client::HornetQClient.createServerLocatorWithHA(*transport_list)
    #TODO: Support: server_locator.addClusterTopologyListener
  else
    Java::org.hornetq.api.core.client::HornetQClient.createServerLocatorWithoutHA(*transport_list)
  end

  # If any other options were supplied, apply them to the server locator
  params.each_pair do |key, val|
    method = key.to_s+'='
    if @server_locator.respond_to? method
      @server_locator.send method, val
      HornetQ.logger.trace { "HornetQ ServerLocator setting: #{key} = #{@connection.send key}" } if @server_locator.respond_to? key.to_sym
    else
      HornetQ.logger.warn "Warning: Option:#{key}, with value:#{val} is invalid and will be ignored"
    end
  end

  @connection = @server_locator.createSessionFactory
  # For handling managed sessions and consumers
  @sessions = []
  @consumers = []
end

Class Method Details

.connection(params = {}, &proc) ⇒ Object

Call the supplied code block after creating a connection instance See initialize for the parameter list The connection is closed before returning

Returns the result of the code block



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/hornetq/client/connection.rb', line 51

def self.connection(params={}, &proc)
  raise "Missing mandatory code block" unless proc
  connection = nil
  result = nil
  begin
    connection=self.new(params)
    result = proc.call(connection)
  ensure
    connection.close
  end
  result
end

.session(params = {}, &proc) ⇒ Object

Create a new Connection and Session

Creates a new connection and session, then passes the session to the supplied
block. Upon completion the session and connection are both closed

See Connection::initialize and Connection::create_session for the list

of parameters

Returns result of block



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/hornetq/client/connection.rb', line 13

def self.session(params={},&proc)
  raise "Missing mandatory code block" unless proc
  connection = nil
  session = nil
  begin
    if params.kind_of?(String)
      # TODO: Support passing username and password from URI to Session
      connection = self.new(params)
      connection.session({}, &proc)
    else
      connection = self.new(params[:connection] || {})
      connection.session(params[:session] || {}, &proc)
    end
  ensure
    connection.close if connection
  end
end

.start_session(params = {}, &proc) ⇒ Object

Create a new Connection along with a Session, and then start the session

Creates a new connection and session, then passes the session to the supplied
block. Upon completion the session and connection are both closed

See Connection::initialize and Connection::create_session for the list

of parameters

Returns result of block



39
40
41
42
43
44
# File 'lib/hornetq/client/connection.rb', line 39

def self.start_session(params={},&proc)
  session(params) do |session|
    session.start
    proc.call(session)
  end
end

Instance Method Details

#closeObject

Close Connection connections



540
541
542
543
544
545
546
# File 'lib/hornetq/client/connection.rb', line 540

def close
  @sessions.each { |session| session.close }
  @connection.close if @connection
  @server_locator.close if @server_locator
  @connection = nil
  @server_locator = nil
end

#create_session(params = {}) ⇒ Object

Create a new HornetQ session

Note: Remember to close the session once it is no longer used.

Recommend using #session with a block over this method where possible

Note:

  • The returned session MUST be closed once complete

    connection = HornetQ::Client::Connection.new(:uri => 'hornetq://localhost/')
    session = connection.create_session
      ...
    session.close
    connection.close
    

Returns:

  • A new HornetQ ClientSession

  • See org.hornetq.api.core.client.ClientSession for documentation on returned object

Throws:

  • NativeException

Example:

require 'hornetq'

connection = nil
session = nil
begin
  connection = HornetQ::Client::Connection.new(:uri => 'hornetq://localhost/')
  session = connection.create_session

  # Create a new queue
  session.create_queue('Example', 'Example', true)

  # Create a producer to send messages
  producer = session.create_producer('Example')

  # Create a Text Message
  message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true)
  message.body_buffer.write_string('Hello World')

  # Send the message
  producer.send(message)
ensure
  session.close if session
  connection.close if connection
end

Parameters:

  • a Hash consisting of one or more of the named parameters

  • Summary of parameters and their default values

connection.create_session(
 :username          => 'my_username',   # Default is no authentication
 :password          => 'password',      # Default is no authentication
 :xa                => false,
 :auto_commit_sends => true,
 :auto_commit_acks  => true,
 :pre_acknowledge   => false,
 :ack_batch_size    => 1
 )

Mandatory Parameters

  • None

Optional Parameters

  • :username

    • The user name. To create an authenticated session

  • :password

    • The user password. To create an authenticated session

  • :xa

    • Whether the session supports XA transaction semantics or not

  • :auto_commit_sends

    • true: automatically commit message sends

    • false: commit manually

  • :auto_commit_acks

    • true: automatically commit message acknowledgement

    • false: commit manually

  • :pre_acknowledge

    • true: to pre-acknowledge messages on the server

    • false: to let the client acknowledge the messages

    • Note: It is possible to pre-acknowledge messages on the server so that the client can avoid additional network trip to the server to acknowledge messages. While this increases performance, this does not guarantee delivery (as messages can be lost after being pre-acknowledged on the server). Use with caution if your application design permits it.

  • :ack_batch_size

    • the batch size of the acknowledgements

  • :managed

    • true: The session will be managed by the connection. It will be

      closed when the connection is closed.
      Also the session will be started or stopped when
      Connection#start_managed_sessions or
      Connection#stop_managed_sessions is called
      
    • false: The caller is responsible for closing the session

    • Default: false



446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/hornetq/client/connection.rb', line 446

def create_session(params={})
  raise "HornetQ::Client::Connection Already Closed" unless @connection
  params ||= {}
  session = @connection.create_session(
    params[:username],
    params[:password],
    params[:xa] || false,
    params[:auto_commit_sends].nil? ? true : params[:auto_commit_sends],
    params[:auto_commit_acks].nil? ? true : params[:auto_commit_acks],
    params[:pre_acknowledge] || false,
    params[:ack_batch_size] || 1)
  (@sessions << session) if params.fetch(:managed, false)
  session
end

#create_session_pool(params = {}) ⇒ Object

Create a Session pool



534
535
536
537
# File 'lib/hornetq/client/connection.rb', line 534

def create_session_pool(params={})
  require 'hornetq/client/session_pool'
  SessionPool.new(self, params)
end

#invm?Boolean

Return true if this connection was configured to use INVM transport protocol

Returns:

  • (Boolean)


340
341
342
# File 'lib/hornetq/client/connection.rb', line 340

def invm?
  @is_invm
end

#on_message(params, &proc) ⇒ Object

Receive messages in a separate thread when they arrive Allows messages to be received in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.

Note:

Session Parameters:

:options => any of the javax.jms.Session constants
   Default: javax.jms.Session::AUTO_ACKNOWLEDGE

:session_count : Number of sessions to create, each with their own consumer which
                 in turn will call the supplied block.
                 Note: The supplied block must be thread safe since it will be called
                       by several threads at the same time.
                       I.e. Don't change instance variables etc. without the
                       necessary semaphores etc.
                 Default: 1

Consumer Parameters:

:queue_name  => The name of the queue to consume messages from. Mandatory
:filter      => Only consume messages matching the filter: Default: nil
:browse_only => Whether to just browse the queue or consume messages
                true | false. Default: false
:window_size => The consumer window size.
:max_rate    => The maximum rate to consume messages.
:auto_start  => Immediately start processing messages.
                If set to false, call Connection#start_managed_sessions
                to manually start receive messages later
                Default: true

:statistics Capture statistics on how many messages have been read
   true  : This method will capture statistics on the number of messages received
           and the time it took to process them.
           The timer starts when each() is called and finishes when either the last message was received,
           or when Destination::statistics is called. In this case MessageConsumer::statistics
           can be called several times during processing without affecting the end time.
           Also, the start time and message count is not reset until MessageConsumer::each
           is called again with :statistics => true

           The statistics gathered are returned when :statistics => true and :async => false

Usage: For transacted sessions (the default) the Proc supplied must return

either true or false:
  true => The session is committed
  false => The session is rolled back
  Any Exception => The session is rolled back

Notes:

  • Remember to call ::start on the connection otherwise the on_message will not start consuming any messages

  • Remember to call message.acknowledge before completing the block so that

    the message will be removed from the queue
    
  • If the block throws an exception, the



603
604
605
606
607
608
609
610
611
612
613
# File 'lib/hornetq/client/connection.rb', line 603

def on_message(params, &proc)
  consumer_count = params[:session_count] || 1
  consumer_count.times do
    session = self.create_session(params)
    consumer = session.create_consumer_from_params(params)
    consumer.on_message(params, &proc)
    session.start if params.fetch(:auto_start, true)
    @consumers << consumer
    @sessions << session
  end
end

#on_message_statisticsObject



615
616
617
# File 'lib/hornetq/client/connection.rb', line 615

def on_message_statistics
  @consumers.collect{|consumer| consumer.on_message_statistics}
end

#session(params = {}, &proc) ⇒ Object

Create a session, call the supplied block and once it completes close the session. See session_create for the Parameters

Returns the result of the block

Example:

require 'hornetq'

HornetQ::Client::Connection.connection('hornetq://localhost/') do |connection
  connection.session do |session|

    # Create a producer to send messages
    session.producer('Example') do |producer|

      # Create a Text Message
      message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true)
      message.body = 'Hello World'

      # Send the message
      producer.send(message)
    end
  end
end


485
486
487
488
489
490
491
492
493
494
# File 'lib/hornetq/client/connection.rb', line 485

def session(params={}, &proc)
  raise "HornetQ::Client::session mandatory block missing" unless proc
  session = nil
  begin
    session = create_session(params)
    proc.call(session)
  ensure
    session.close if session
  end
end

#start_managed_sessionsObject

Start all sessions managed by this connection

Sessions created via #create_session are not managed unless :managed => true was specified when the session was created

Session are Only managed when created through the following methods:

Connection#on_message
Connection#create_session And :managed => true

This call does not do anything to sessions in a session pool



629
630
631
# File 'lib/hornetq/client/connection.rb', line 629

def start_managed_sessions
  @sessions.each {|session| session.start}
end

#start_session(params = {}, &proc) ⇒ Object

Create a session, start the session, call the supplied block and once the block completes close the session.

See: #session_create for the Parameters

Returns the result of the block

Example:

require 'hornetq'

HornetQ::Client::Connection.connection('hornetq://localhost/') do |connection
  # Must start the session other we cannot consume messages using it
  connection.start_session do |session|

    # Create a consumer to receive messages
    session.consumer('TestQueue') do |consumer|

      consumer.each do |message|
        message.acknowledge
      end

    end
  end
end


521
522
523
524
525
526
527
528
529
530
531
# File 'lib/hornetq/client/connection.rb', line 521

def start_session(params={}, &proc)
  raise "HornetQ::Client::session mandatory block missing" unless proc
  session = nil
  begin
    session = create_session(params)
    session.start
    proc.call(session)
  ensure
    session.close if session
  end
end

#stop_managed_sessionsObject

Stop all sessions managed by this connection so that they no longer receive messages for processing

See: #start_managed_sessions for details on which sessions are managed



637
638
639
# File 'lib/hornetq/client/connection.rb', line 637

def stop_managed_sessions
  @sessions.each {|session| session.stop}
end