Class: Stomp::Client
- Inherits:
-
Object
- Object
- Stomp::Client
- Extended by:
- Forwardable
- Defined in:
- lib/stomp/client.rb,
lib/client/utils.rb
Overview
Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
Instance Attribute Summary collapse
-
#parameters ⇒ Object
readonly
Parameters hash.
Class Method Summary collapse
-
.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object
open is syntactic sugar for 'Client.new', see 'initialize' for usage.
Instance Method Summary collapse
-
#abort(name, headers = {}) ⇒ Object
Abort aborts work in a transaction by name.
-
#ack(message, headers = {}) ⇒ Object
(also: #acknowledge)
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,=> 'client'). Accepts a transaction header ( :transaction => 'some_transaction_id' )..
- #ack_context_for(message, headers) ⇒ Object
-
#autoflush ⇒ Object
autoflush returns the current connection's autoflush setting.
-
#autoflush=(af) ⇒ Object
autoflush= sets the current connection's autoflush setting.
-
#begin(name, headers = {}) ⇒ Object
Begin starts work in a a transaction by name.
-
#close(headers = {}) ⇒ Object
close frees resources in use by this client.
-
#closed? ⇒ Boolean
close? tests if this client connection is closed.
-
#commit(name, headers = {}) ⇒ Object
Commit commits work in a transaction by name.
-
#connection_frame ⇒ Object
Return the broker's CONNECTED frame to the client.
- #create_error_handler ⇒ Object
-
#disconnect_receipt ⇒ Object
Return any RECEIPT frame received by DISCONNECT.
-
#hbrecv_count ⇒ Object
hbrecv_count returns the current connection's heartbeat receive count.
-
#hbrecv_interval ⇒ Object
hbrecv_interval returns the connection's heartbeat receive interval.
-
#hbsend_count ⇒ Object
hbsend_count returns the current connection's heartbeat send count.
-
#hbsend_interval ⇒ Object
hbsend_interval returns the connection's heartbeat send interval.
-
#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) ⇒ Client
constructor
A new Client object can be initialized using three forms:.
-
#join(limit = nil) ⇒ Object
join the listener thread for this client, generally used to wait for a quit signal.
-
#jruby? ⇒ Boolean
jruby? tests if the connection has detcted a JRuby environment.
-
#nack(message, headers = {}) ⇒ Object
Stomp 1.1+ NACK.
-
#open? ⇒ Boolean
open? tests if this client connection is open.
-
#poll ⇒ Object
Poll for asynchronous messages issued by broker.
-
#protocol ⇒ Object
protocol returns the current client's protocol level.
-
#publish(destination, message, headers = {}) ⇒ Object
Publishes message to destination.
-
#running ⇒ Object
running checks if the thread was created and is not dead.
-
#set_logger(logger) ⇒ Object
set_logger identifies a new callback logger.
-
#sha1(data) ⇒ Object
sha1 returns a SHA1 sum of a given string.
-
#subscribe(destination, headers = {}) ⇒ Object
Subscribe to a destination, must be passed a block which will be used as a callback listener.
-
#unreceive(message, options = {}) ⇒ Object
Unreceive a message, sending it back to its queue or to the DLQ.
-
#unsubscribe(destination, headers = {}) ⇒ Object
Unsubscribe from a subscription by name.
-
#uuid ⇒ Object
uuid returns a type 4 UUID.
-
#valid_utf8?(s) ⇒ Boolean
valid_utf8? validates any given string for UTF8 compliance.
Constructor Details
#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) ⇒ Client
A new Client object can be initialized using three forms:
Hash (this is the recommended Client initialization method):
hash = {
:hosts => [
{:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
{:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
],
# These are the default parameters and do not need to be set
:reliable => true, # reliable (use failover)
:initial_reconnect_delay => 0.01, # initial delay before reconnect (secs)
:max_reconnect_delay => 30.0, # max delay before reconnect
:use_exponential_back_off => true, # increase delay between reconnect attpempts
:back_off_multiplier => 2, # next delay multiplier
:max_reconnect_attempts => 0, # retry forever, use # for maximum attempts
:randomize => false, # do not radomize hosts hash before reconnect
:connect_timeout => 0, # Timeout for TCP/TLS connects, use # for max seconds
:connect_headers => {}, # user supplied CONNECT headers (req'd for Stomp 1.1+)
:parse_timeout => 5, # IO::select wait time on socket reads
:logger => nil, # user suplied callback logger instance
:dmh => false, # do not support multihomed IPV4 / IPV6 hosts during failover
:closed_check => true, # check first if closed in each protocol method
:hbser => false, # raise on heartbeat send exception
:stompconn => false, # Use STOMP instead of CONNECT
:usecrlf => false, # Use CRLF command and header line ends (1.2+)
:max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry
:max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry
:fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ...
# For fast heartbeat senders. 'fast' == YMMV. If not
# correct for your environment, expect unnecessary fail overs
:connread_timeout => 0, # Timeout during CONNECT for read of CONNECTED/ERROR, secs
:tcp_nodelay => true, # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
:start_timeout => 0, # Timeout around Stomp::Client initialization
:sslctx_newparm => nil, # Param for SSLContext.new
:ssl_post_conn_check => true, # Further verify broker identity
:nto_cmd_read => true, # No timeout on COMMAND read
}
e.g. c = Stomp::Client.new(hash)
Positional parameters:
login (String, default : '')
passcode (String, default : '')
host (String, default : 'localhost')
port (Integer, default : 61613)
reliable (Boolean, default : false)
e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms:
stomp://host:port
stomp://host.domain.tld:port
stomp://login:[email protected]:port
stomp://login:[email protected]:port
e.g. c = Stomp::Client.new(urlstring)
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/stomp/client.rb', line 84 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) parse_hash_params(login) || parse_stomp_url(login) || parse_failover_url(login) || parse_positional_params(login, passcode, host, port, reliable) @logger = @parameters[:logger] ||= Stomp::NullLogger.new @start_timeout = @parameters[:start_timeout] || 0 @parameters[:client_main] = Thread::current ## p [ "CLINDBG", @parameters[:client_main] ] check_arguments!() # p [ "cldbg01", @parameters ] begin Timeout::timeout(@start_timeout) { create_error_handler create_connection(autoflush) start_listeners() } rescue Timeout::Error # p [ "cldbg02" ] ex = Stomp::Error::StartTimeoutException.new(@start_timeout) raise ex end end |
Instance Attribute Details
#parameters ⇒ Object (readonly)
Parameters hash
19 20 21 |
# File 'lib/stomp/client.rb', line 19 def parameters @parameters end |
Class Method Details
.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object
open is syntactic sugar for 'Client.new', see 'initialize' for usage.
140 141 142 |
# File 'lib/stomp/client.rb', line 140 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end |
Instance Method Details
#abort(name, headers = {}) ⇒ Object
Abort aborts work in a transaction by name.
156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/stomp/client.rb', line 156 def abort(name, headers = {}) @connection.abort(name, headers) # replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do || find_listener() # find_listener also calls the listener end end end |
#ack(message, headers = {}) ⇒ Object Also known as: acknowledge
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,=> 'client'). Accepts a transaction header ( :transaction => 'some_transaction_id' ).
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/stomp/client.rb', line 205 def ack(, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << end if block_given? headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r})) end context = ack_context_for(, headers) @connection.ack context[:message_id], context[:headers] end |
#ack_context_for(message, headers) ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/stomp/client.rb', line 233 def ack_context_for(, headers) id = case protocol when Stomp::SPL_12 'ack' when Stomp::SPL_11 headers = headers.merge(:subscription => .headers['subscription']) 'message-id' else 'message-id' end {:message_id => .headers[id], :headers => headers} end |
#autoflush ⇒ Object
autoflush returns the current connection's autoflush setting.
359 360 361 |
# File 'lib/stomp/client.rb', line 359 def autoflush() @connection.autoflush() end |
#autoflush=(af) ⇒ Object
autoflush= sets the current connection's autoflush setting.
354 355 356 |
# File 'lib/stomp/client.rb', line 354 def autoflush=(af) @connection.autoflush = af end |
#begin(name, headers = {}) ⇒ Object
Begin starts work in a a transaction by name.
151 152 153 |
# File 'lib/stomp/client.rb', line 151 def begin(name, headers = {}) @connection.begin(name, headers) end |
#close(headers = {}) ⇒ Object
close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.
291 292 293 294 |
# File 'lib/stomp/client.rb', line 291 def close(headers={}) @listener_thread.exit @connection.disconnect(headers) end |
#closed? ⇒ Boolean
close? tests if this client connection is closed.
280 281 282 |
# File 'lib/stomp/client.rb', line 280 def closed?() @connection.closed?() end |
#commit(name, headers = {}) ⇒ Object
Commit commits work in a transaction by name.
169 170 171 172 173 |
# File 'lib/stomp/client.rb', line 169 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end |
#connection_frame ⇒ Object
Return the broker's CONNECTED frame to the client. Misnamed.
265 266 267 |
# File 'lib/stomp/client.rb', line 265 def connection_frame() @connection.connection_frame end |
#create_error_handler ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/stomp/client.rb', line 111 def create_error_handler client_thread = Thread.current if client_thread.respond_to?(:report_on_exception=) client_thread.report_on_exception=false end @error_listener = lambda do |error| exception = case error.body when /ResourceAllocationException/i Stomp::Error::ProducerFlowControlException.new(error) when /ProtocolException/i Stomp::Error::ProtocolException.new(error) else Stomp::Error::BrokerException.new(error) end @receipt_listeners.delete(error.headers['receipt-id']) if error.headers['receipt-id'] client_thread.raise exception end end |
#disconnect_receipt ⇒ Object
Return any RECEIPT frame received by DISCONNECT.
270 271 272 |
# File 'lib/stomp/client.rb', line 270 def disconnect_receipt() @connection.disconnect_receipt end |
#hbrecv_count ⇒ Object
hbrecv_count returns the current connection's heartbeat receive count.
343 344 345 |
# File 'lib/stomp/client.rb', line 343 def hbrecv_count() @connection.hbrecv_count() end |
#hbrecv_interval ⇒ Object
hbrecv_interval returns the connection's heartbeat receive interval.
333 334 335 |
# File 'lib/stomp/client.rb', line 333 def hbrecv_interval() @connection.hbrecv_interval() end |
#hbsend_count ⇒ Object
hbsend_count returns the current connection's heartbeat send count.
338 339 340 |
# File 'lib/stomp/client.rb', line 338 def hbsend_count() @connection.hbsend_count() end |
#hbsend_interval ⇒ Object
hbsend_interval returns the connection's heartbeat send interval.
328 329 330 |
# File 'lib/stomp/client.rb', line 328 def hbsend_interval() @connection.hbsend_interval() end |
#join(limit = nil) ⇒ Object
join the listener thread for this client, generally used to wait for a quit signal.
146 147 148 |
# File 'lib/stomp/client.rb', line 146 def join(limit = nil) @listener_thread.join(limit) end |
#jruby? ⇒ Boolean
jruby? tests if the connection has detcted a JRuby environment
285 286 287 |
# File 'lib/stomp/client.rb', line 285 def jruby?() @connection.jruby end |
#nack(message, headers = {}) ⇒ Object
Stomp 1.1+ NACK.
227 228 229 230 |
# File 'lib/stomp/client.rb', line 227 def nack(, headers = {}) context = ack_context_for(, headers) @connection.nack context[:message_id], context[:headers] end |
#open? ⇒ Boolean
open? tests if this client connection is open.
275 276 277 |
# File 'lib/stomp/client.rb', line 275 def open? @connection.open?() end |
#poll ⇒ Object
Poll for asynchronous messages issued by broker. Return nil of no message available, else the message
349 350 351 |
# File 'lib/stomp/client.rb', line 349 def poll() @connection.poll() end |
#protocol ⇒ Object
protocol returns the current client's protocol level.
308 309 310 |
# File 'lib/stomp/client.rb', line 308 def protocol() @connection.protocol() end |
#publish(destination, message, headers = {}) ⇒ Object
Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => 'some_transaction_id' ).
255 256 257 258 259 260 261 262 |
# File 'lib/stomp/client.rb', line 255 def publish(destination, , headers = {}) headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination if block_given? headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r})) end @connection.publish(destination, , headers) end |
#running ⇒ Object
running checks if the thread was created and is not dead.
297 298 299 |
# File 'lib/stomp/client.rb', line 297 def running() @listener_thread && !!@listener_thread.status end |
#set_logger(logger) ⇒ Object
set_logger identifies a new callback logger.
302 303 304 305 |
# File 'lib/stomp/client.rb', line 302 def set_logger(logger) @logger = logger @connection.set_logger(logger) end |
#sha1(data) ⇒ Object
sha1 returns a SHA1 sum of a given string.
318 319 320 |
# File 'lib/stomp/client.rb', line 318 def sha1(data) @connection.sha1(data) end |
#subscribe(destination, headers = {}) ⇒ Object
Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => 'some_transaction_id' ).
178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/stomp/client.rb', line 178 def subscribe(destination, headers = {}) raise Stomp::Error::NoListenerGiven unless block_given? headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. headers = headers.merge(:id => build_subscription_id(destination, headers)) if @listeners[headers[:id]] raise Stomp::Error::DuplicateSubscription end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end |
#unreceive(message, options = {}) ⇒ Object
Unreceive a message, sending it back to its queue or to the DLQ.
247 248 249 |
# File 'lib/stomp/client.rb', line 247 def unreceive(, = {}) @connection.unreceive(, ) end |
#unsubscribe(destination, headers = {}) ⇒ Object
Unsubscribe from a subscription by name.
194 195 196 197 198 199 200 |
# File 'lib/stomp/client.rb', line 194 def unsubscribe(destination, headers = {}) headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination headers = headers.merge(:id => build_subscription_id(destination, headers)) @connection.unsubscribe(destination, headers) @listeners[headers[:id]] = nil end |
#uuid ⇒ Object
uuid returns a type 4 UUID.
323 324 325 |
# File 'lib/stomp/client.rb', line 323 def uuid() @connection.uuid() end |
#valid_utf8?(s) ⇒ Boolean
valid_utf8? validates any given string for UTF8 compliance.
313 314 315 |
# File 'lib/stomp/client.rb', line 313 def valid_utf8?(s) @connection.valid_utf8?(s) end |