Module: ModernTimes::JMS::Worker
Overview
Base Worker Class for any class that will be processing messages from topics or queues By default, it will consume messages from a queue with the class name minus the Worker postfix. For example, the queue call is unnecessary as it will default to a value of ‘Foo’ anyways:
class FooWorker < ModernTimes::JMS::Worker
queue 'Foo'
def perform(obj)
# Perform work on obj
end
end
A topic can be specified using virtual_topic as follows (ActiveMQ only). Multiple separate workers can subscribe to the same topic (under ActiveMQ - see activemq.apache.org/virtual-destinations.html):
class FooWorker < ModernTimes::JMS::Worker
virtual_topic 'Zulu'
def perform(obj)
# Perform work on obj
end
end
Filters can also be specified within the class:
class FooWorker < ModernTimes::JMS::Worker
filter 'age > 30'
def perform(obj)
# Perform work on obj
end
end
Defined Under Namespace
Modules: ClassMethods
Instance Attribute Summary collapse
-
#destination_options ⇒ Object
readonly
Returns the value of attribute destination_options.
-
#error_count ⇒ Object
readonly
Returns the value of attribute error_count.
-
#message ⇒ Object
Returns the value of attribute message.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
-
#time_track ⇒ Object
readonly
Returns the value of attribute time_track.
Attributes included from Base::Worker
#index, #name, #options, #thread
Class Method Summary collapse
Instance Method Summary collapse
- #initialize(opts = {}) ⇒ Object
- #log_backtrace(e) ⇒ Object
- #message_count ⇒ Object
- #perform(object) ⇒ Object
-
#start ⇒ Object
Start the event loop for handling messages off the queue.
- #status ⇒ Object
- #stop ⇒ Object
- #to_s ⇒ Object
Methods included from Base::Worker
Instance Attribute Details
#destination_options ⇒ Object (readonly)
Returns the value of attribute destination_options.
37 38 39 |
# File 'lib/modern_times/jms/worker.rb', line 37 def @destination_options end |
#error_count ⇒ Object (readonly)
Returns the value of attribute error_count.
37 38 39 |
# File 'lib/modern_times/jms/worker.rb', line 37 def error_count @error_count end |
#message ⇒ Object
Returns the value of attribute message.
38 39 40 |
# File 'lib/modern_times/jms/worker.rb', line 38 def @message end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
37 38 39 |
# File 'lib/modern_times/jms/worker.rb', line 37 def session @session end |
#time_track ⇒ Object (readonly)
Returns the value of attribute time_track.
37 38 39 |
# File 'lib/modern_times/jms/worker.rb', line 37 def time_track @time_track end |
Class Method Details
.included(base) ⇒ Object
81 82 83 84 |
# File 'lib/modern_times/jms/worker.rb', line 81 def self.included(base) base.extend(ModernTimes::Base::Worker::ClassMethods) base.extend(ClassMethods) end |
Instance Method Details
#initialize(opts = {}) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/modern_times/jms/worker.rb', line 86 def initialize(opts={}) super @stopped = false @status = 'initialized' @time_track = ModernTimes::TimeTrack.new @error_count = 0 @message_mutex = Mutex.new @destination_options = self.class..dup # Default the queue name to the Worker name if a destinations hasn't been specified if @destination_options.keys.select {|k| [:virtual_topic_name, :queue_name, :destination].include?(k)}.empty? @destination_options[:queue_name] = self.name end @real_destination_options = @destination_options.dup virtual_topic_name = @real_destination_options.delete(:virtual_topic_name) @real_destination_options[:queue_name] = "Consumer.#{name}.VirtualTopic.#{virtual_topic_name}" if virtual_topic_name target = opts[:fail_queue] target = self.class.fail_queue_target if target.nil? target = self.class.default_fail_queue_target if target.nil? if target == true @fail_queue_name = "#{name}Fail" elsif target == false @fail_queue_name = nil elsif target.kind_of?(String) @fail_queue_name = target else raise "Invalid fail queue: #{target}" end end |
#log_backtrace(e) ⇒ Object
170 171 172 |
# File 'lib/modern_times/jms/worker.rb', line 170 def log_backtrace(e) ModernTimes.logger.error "\t#{e.backtrace.join("\n\t")}" end |
#message_count ⇒ Object
118 119 120 |
# File 'lib/modern_times/jms/worker.rb', line 118 def @time_track.total_count end |
#perform(object) ⇒ Object
162 163 164 |
# File 'lib/modern_times/jms/worker.rb', line 162 def perform(object) raise "#{self}: Need to override perform method in #{self.class.name} in order to act on #{object}" end |
#start ⇒ Object
Start the event loop for handling messages off the queue
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/modern_times/jms/worker.rb', line 127 def start @session = Connection.create_consumer_session @consumer = @session.consumer(@real_destination_options) @session.start ModernTimes.logger.debug "#{self}: Starting receive loop" @status = nil while !@stopped && msg = @consumer.receive @time_track.perform do @message_mutex.synchronize do (msg) msg.acknowledge end end ModernTimes.logger.info {"#{self}::on_message (#{('%.1f' % (@time_track.last_time*1000.0))}ms)"} if ModernTimes::JMS::Connection.log_times? ModernTimes.logger.flush if ModernTimes.logger.respond_to?(:flush) end @status = 'Exited' ModernTimes.logger.info "#{self}: Exiting" rescue Exception => e @status = "Exited with exception #{e.}" ModernTimes.logger.error "#{self}: Exception, thread terminating: #{e.}\n\t#{e.backtrace.join("\n\t")}" ensure ModernTimes.logger.flush if ModernTimes.logger.respond_to?(:flush) end |
#status ⇒ Object
122 123 124 |
# File 'lib/modern_times/jms/worker.rb', line 122 def status @status || "Processing message #{@time_track.total_count}" end |
#stop ⇒ Object
153 154 155 156 157 158 159 160 |
# File 'lib/modern_times/jms/worker.rb', line 153 def stop @stopped = true # Don't clobber the session before a reply @message_mutex.synchronize do @consumer.close if @consumer @session.close if @session end end |
#to_s ⇒ Object
166 167 168 |
# File 'lib/modern_times/jms/worker.rb', line 166 def to_s "#{@real_destination_options.to_a.join('=>')}:#{index}" end |