Module: JMS::MessageConsumer
- Defined in:
- lib/jms/message_consumer.rb
Overview
Interface javax.jms.MessageConsumer
Instance Method Summary collapse
-
#each(params = {}, &block) ⇒ Object
For each message available to be consumed call the supplied block Returns the statistics gathered when statistics: true, otherwise nil.
-
#get(params = {}) ⇒ Object
Obtain a message from the Destination or Topic In JMS terms, the message is received from the Destination :timeout follows the rules for MQSeries: -1 : Wait forever 0 : Return immediately if no message is available x : Wait for x milli-seconds for a message to be received from the broker Note: Messages may still be on the queue, but the broker has not supplied any messages in the time interval specified Default: 0 :buffered_message - consume Oracle AQ buffered message Default: false.
-
#on_message(params = {}, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread.
-
#on_message_statistics ⇒ Object
Return the current statistics for a running MessageConsumer::on_message.
Instance Method Details
#each(params = {}, &block) ⇒ Object
For each message available to be consumed call the supplied block Returns the statistics gathered when statistics: true, otherwise nil
Parameters:
:timeout How to timeout waiting for messages on the Queue or Topic
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The statistics can be reset by calling MessageConsumer::each again
with statistics: true
The statistics gathered are returned when statistics: true and async: false
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/jms/message_consumer.rb', line 57 def each(params={}, &block) raise(ArgumentError, 'Destination::each requires a code block to be executed for each message received') unless block = nil start_time = nil if params[:statistics] = 0 start_time = Time.now end # Receive messages according to timeout while = self.get(params) do block.call() += 1 if end unless .nil? duration = Time.now - start_time { messages: , duration: duration, messages_per_second: duration > 0 ? (/duration).to_i : 0, ms_per_msg: > 0 ? (duration*1000.0)/ : 0 } end end |
#get(params = {}) ⇒ Object
Obtain a message from the Destination or Topic In JMS terms, the message is received from the Destination :timeout follows the rules for MQSeries:
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
:buffered_message - consume Oracle AQ buffered message
Default: false
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/jms/message_consumer.rb', line 14 def get(params={}) timeout = params[:timeout] || 0 = params[:buffered_message] || false if timeout == -1 if self.bufferReceive else self.receive end elsif timeout == 0 if self.bufferReceiveNoWait else self.receiveNoWait end else if self.bufferReceive(timeout) else self.receive(timeout) end end end |
#on_message(params = {}, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.
Parameters:
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The timer starts when each() is called and finishes when either the last message was received,
or when Destination::statistics is called. In this case MessageConsumer::statistics
can be called several times during processing without affecting the end time.
Also, the start time and message count is not reset until MessageConsumer::each
is called again with statistics: true
The statistics gathered are returned when statistics: true and async: false
103 104 105 106 107 108 109 110 111 |
# File 'lib/jms/message_consumer.rb', line 103 def (params={}, &proc) raise(ArgumentError, 'MessageConsumer::on_message requires a code block to be executed for each message received') unless proc # Turn on Java class persistence: https://github.com/jruby/jruby/wiki/Persistence self.class.__persistent__ = true @listener = JMS::MessageListenerImpl.new(params, &proc) self.setMessageListener(@listener) end |
#on_message_statistics ⇒ Object
Return the current statistics for a running MessageConsumer::on_message
114 115 116 117 118 |
# File 'lib/jms/message_consumer.rb', line 114 def stats = @listener.statistics if @listener raise(ArgumentError, 'First call MessageConsumer::on_message with statistics: true before calling MessageConsumer::statistics()') unless stats stats end |