Module: Qwirk::Worker
Overview
Base Worker Class for any class that will be processing messages from topics or queues By default, it will consume messages from a queue with the class name minus the Worker postfix. For example, the queue call is unnecessary as it will default to a value of ‘Foo’ anyways:
class FooWorker
include Qwirk::Worker
queue 'Foo'
def perform(obj)
# Perform work on obj
end
end
A topic can also be specified. Note that for JMS, this is only supported under ActiveMQ. On others, each thread for a given worker will act as a separate subscriber. (For ActiveMQ - see activemq.apache.org/virtual-destinations.html):
class FooWorker
include Qwirk::Worker
topic 'Zulu'
def perform(obj)
# Perform work on obj
end
end
TODO (maybe): Filters can also be specified within the class:
class FooWorker
include Qwirk::Worker
filter 'age > 30'
def perform(obj)
# Perform work on obj
end
end
Defined Under Namespace
Modules: ClassMethods
Instance Attribute Summary collapse
-
#impl ⇒ Object
readonly
Returns the value of attribute impl.
-
#message ⇒ Object
Returns the value of attribute message.
-
#start_processing_time ⇒ Object
readonly
Returns the value of attribute start_processing_time.
-
#start_read_time ⇒ Object
readonly
Returns the value of attribute start_read_time.
-
#start_worker_time ⇒ Object
readonly
Returns the value of attribute start_worker_time.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Attributes included from BaseWorker
Class Method Summary collapse
Instance Method Summary collapse
-
#event_loop ⇒ Object
Start the event loop for handling messages off the queue.
- #fail_queue_name ⇒ Object
- #init(index, worker_config) ⇒ Object
- #join(timeout = nil) ⇒ Object
-
#log_backtrace(e) ⇒ Object
Allow override of backtrace logging in case the client doesn’t want to get spammed with it (maybe just config instead?).
- #on_exception(e) ⇒ Object
- #on_message(message) ⇒ Object
- #perform(object) ⇒ Object
- #start ⇒ Object
-
#stop ⇒ Object
Workers will be starting and stopping on an as needed basis.
- #to_s ⇒ Object
Methods included from BaseWorker
Instance Attribute Details
#impl ⇒ Object (readonly)
Returns the value of attribute impl.
40 41 42 |
# File 'lib/qwirk/worker.rb', line 40 def impl @impl end |
#message ⇒ Object
Returns the value of attribute message.
39 40 41 |
# File 'lib/qwirk/worker.rb', line 39 def @message end |
#start_processing_time ⇒ Object (readonly)
Returns the value of attribute start_processing_time.
40 41 42 |
# File 'lib/qwirk/worker.rb', line 40 def start_processing_time @start_processing_time end |
#start_read_time ⇒ Object (readonly)
Returns the value of attribute start_read_time.
40 41 42 |
# File 'lib/qwirk/worker.rb', line 40 def start_read_time @start_read_time end |
#start_worker_time ⇒ Object (readonly)
Returns the value of attribute start_worker_time.
40 41 42 |
# File 'lib/qwirk/worker.rb', line 40 def start_worker_time @start_worker_time end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
40 41 42 |
# File 'lib/qwirk/worker.rb', line 40 def status @status end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
40 41 42 |
# File 'lib/qwirk/worker.rb', line 40 def thread @thread end |
Class Method Details
.included(base) ⇒ Object
110 111 112 113 |
# File 'lib/qwirk/worker.rb', line 110 def self.included(base) Qwirk::BaseWorker.included(base) base.extend(ClassMethods) end |
Instance Method Details
#event_loop ⇒ Object
Start the event loop for handling messages off the queue
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/qwirk/worker.rb', line 174 def event_loop Qwirk.logger.debug "#{self}: Starting receive loop" @start_worker_time = Time.now until @stopped || (config.stopped? && @impl.ready_to_stop?) Qwirk.logger.debug "#{self}: Waiting for read" @start_read_time = Time.now msg = @impl. if msg @start_processing_time = Time.now Qwirk.logger.debug {"#{self}: Done waiting for read in #{@start_processing_time - @start_read_time} seconds"} delta = config.timer.measure do @processing_mutex.synchronize do (msg) @impl.(msg) end end Qwirk.logger.info {"#{self}::on_message (#{'%.1f' % delta}ms)"} if self.config.log_times Qwirk.logger.flush if Qwirk.logger.respond_to?(:flush) end end Qwirk.logger.info "#{self}: Exiting" rescue Exception => e @status = "Terminated: #{e.}" Qwirk.logger.error "#{self}: Exception, thread terminating: #{e.}\n\t#{e.backtrace.join("\n\t")}" ensure @status = 'Stopped' Qwirk.logger.flush if Qwirk.logger.respond_to?(:flush) config.worker_stopped(self) end |
#fail_queue_name ⇒ Object
225 226 227 |
# File 'lib/qwirk/worker.rb', line 225 def fail_queue_name @fail_queue_name end |
#init(index, worker_config) ⇒ Object
115 116 117 118 119 120 121 122 |
# File 'lib/qwirk/worker.rb', line 115 def init(index, worker_config) @status = 'Started' @stopped = false @processing_mutex = Mutex.new self.index = index self.config = worker_config @impl = worker_config.create_worker end |
#join(timeout = nil) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/qwirk/worker.rb', line 148 def join(timeout=nil) time_str = timeout ? "#{timeout} seconds" : 'indefinitely' Qwirk.logger.info "#{self}: Waiting #{time_str} for worker to stop" # See http://jira.codehaus.org/browse/JRUBY-6896 if timeout && !self.thread.join(timeout) Qwirk.logger.warn "#{self}: WARNING - worker stop commanded but thread did not exit gracefully in #{timeout} seconds" else self.thread.join end Qwirk.logger.info "#{self}: worker stop complete" end |
#log_backtrace(e) ⇒ Object
Allow override of backtrace logging in case the client doesn’t want to get spammed with it (maybe just config instead?)
169 170 171 |
# File 'lib/qwirk/worker.rb', line 169 def log_backtrace(e) Qwirk.logger.error "\t#{e.backtrace.join("\n\t")}" end |
#on_exception(e) ⇒ Object
216 217 218 219 220 221 222 223 |
# File 'lib/qwirk/worker.rb', line 216 def on_exception(e) Qwirk.logger.error "#{self}: Messaging Exception: #{e.}" log_backtrace(e) @impl.handle_failure(, e, @fail_queue_name) if @fail_queue_name rescue Exception => e Qwirk.logger.error "#{self}: Exception in exception reply: #{e.}" log_backtrace(e) end |
#on_message(message) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/qwirk/worker.rb', line 204 def () # TBD - Is it necessary to provide underlying message to worker? Should we generically provide access to message attributes? Do filters somehow fit in here? @message = object = @impl.() Qwirk.logger.debug {"#{self}: Received Object: #{object}"} perform(object) rescue Exception => e on_exception(e) ensure Qwirk.logger.debug {"#{self}: Finished processing message"} end |
#perform(object) ⇒ Object
160 161 162 |
# File 'lib/qwirk/worker.rb', line 160 def perform(object) raise "#{self}: Need to override perform method in #{self.class.name} in order to act on #{object}" end |
#start ⇒ Object
124 125 126 127 128 129 130 |
# File 'lib/qwirk/worker.rb', line 124 def start @thread = Thread.new do java.lang.Thread.current_thread.name = "Qwirk worker: #{self}" if RUBY_PLATFORM == 'jruby' #Qwirk.logger.debug "#{worker}: Started thread with priority #{Thread.current.priority}" event_loop end end |
#stop ⇒ Object
Workers will be starting and stopping on an as needed basis. Thus, when they get a stop command they should clean up any resources. We don’t want to clobber resources while a message is being processed so processing_mutex will surround message processessing and worker closing. From a JMS perspective, stop all workers (close consumer and session), stop the config. From an InMemory perspective, we don’t want the workers stopping until all messages in the queue have been processed. Therefore we want to stop the
138 139 140 141 142 143 144 145 146 |
# File 'lib/qwirk/worker.rb', line 138 def stop Qwirk.logger.debug "#{self}: In worker stop" @status = 'Stopping' @stopped = true @processing_mutex.synchronize do # This should interrupt @impl.receive_message above and cause it to return nil @impl.stop end end |
#to_s ⇒ Object
164 165 166 |
# File 'lib/qwirk/worker.rb', line 164 def to_s "#{config.name}:#{index}" end |