Module: Qwirk::Worker

Includes:
BaseWorker
Included in:
ReplyWorker
Defined in:
lib/qwirk/worker.rb

Overview

Base Worker Class for any class that will be processing messages from topics or queues By default, it will consume messages from a queue with the class name minus the Worker postfix. For example, the queue call is unnecessary as it will default to a value of ‘Foo’ anyways:

class FooWorker
  include Qwirk::Worker
  queue 'Foo'
  def perform(obj)
    # Perform work on obj
  end
end

A topic can also be specified. Note that for JMS, this is only supported under ActiveMQ. On others, each thread for a given worker will act as a separate subscriber. (For ActiveMQ - see activemq.apache.org/virtual-destinations.html):

class FooWorker
  include Qwirk::Worker
  topic 'Zulu'
  def perform(obj)
    # Perform work on obj
  end
end

TODO (maybe): Filters can also be specified within the class:

class FooWorker
  include Qwirk::Worker
  filter 'age > 30'
  def perform(obj)
    # Perform work on obj
  end
end

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Attributes included from BaseWorker

#config, #index

Class Method Summary collapse

Instance Method Summary collapse

Methods included from BaseWorker

worker_classes

Instance Attribute Details

#implObject (readonly)

Returns the value of attribute impl.



40
41
42
# File 'lib/qwirk/worker.rb', line 40

def impl
  @impl
end

#messageObject

Returns the value of attribute message.



39
40
41
# File 'lib/qwirk/worker.rb', line 39

def message
  @message
end

#start_processing_timeObject (readonly)

Returns the value of attribute start_processing_time.



40
41
42
# File 'lib/qwirk/worker.rb', line 40

def start_processing_time
  @start_processing_time
end

#start_read_timeObject (readonly)

Returns the value of attribute start_read_time.



40
41
42
# File 'lib/qwirk/worker.rb', line 40

def start_read_time
  @start_read_time
end

#start_worker_timeObject (readonly)

Returns the value of attribute start_worker_time.



40
41
42
# File 'lib/qwirk/worker.rb', line 40

def start_worker_time
  @start_worker_time
end

#statusObject (readonly)

Returns the value of attribute status.



40
41
42
# File 'lib/qwirk/worker.rb', line 40

def status
  @status
end

#threadObject (readonly)

Returns the value of attribute thread.



40
41
42
# File 'lib/qwirk/worker.rb', line 40

def thread
  @thread
end

Class Method Details

.included(base) ⇒ Object



110
111
112
113
# File 'lib/qwirk/worker.rb', line 110

def self.included(base)
  Qwirk::BaseWorker.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#event_loopObject

Start the event loop for handling messages off the queue



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/qwirk/worker.rb', line 174

def event_loop
  Qwirk.logger.debug "#{self}: Starting receive loop"
  @start_worker_time = Time.now
  until @stopped || (config.stopped? && @impl.ready_to_stop?)
    Qwirk.logger.debug "#{self}: Waiting for read"
    @start_read_time = Time.now
    msg = @impl.receive_message
    if msg
      @start_processing_time = Time.now
      Qwirk.logger.debug {"#{self}: Done waiting for read in #{@start_processing_time - @start_read_time} seconds"}
      delta = config.timer.measure do
        @processing_mutex.synchronize do
          on_message(msg)
          @impl.acknowledge_message(msg)
        end
      end
      Qwirk.logger.info {"#{self}::on_message (#{'%.1f' % delta}ms)"} if self.config.log_times
      Qwirk.logger.flush if Qwirk.logger.respond_to?(:flush)
    end
  end
  Qwirk.logger.info "#{self}: Exiting"
rescue Exception => e
  @status = "Terminated: #{e.message}"
  Qwirk.logger.error "#{self}: Exception, thread terminating: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
ensure
  @status = 'Stopped'
  Qwirk.logger.flush if Qwirk.logger.respond_to?(:flush)
  config.worker_stopped(self)
end

#fail_queue_nameObject



225
226
227
# File 'lib/qwirk/worker.rb', line 225

def fail_queue_name
  @fail_queue_name
end

#init(index, worker_config) ⇒ Object



115
116
117
118
119
120
121
122
# File 'lib/qwirk/worker.rb', line 115

def init(index, worker_config)
  @status           = 'Started'
  @stopped          = false
  @processing_mutex = Mutex.new
  self.index        = index
  self.config       = worker_config
  @impl             = worker_config.create_worker
end

#join(timeout = nil) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
# File 'lib/qwirk/worker.rb', line 148

def join(timeout=nil)
  time_str = timeout ? "#{timeout} seconds" : 'indefinitely'
  Qwirk.logger.info "#{self}: Waiting #{time_str} for worker to stop"
  # See http://jira.codehaus.org/browse/JRUBY-6896
  if timeout && !self.thread.join(timeout)
    Qwirk.logger.warn "#{self}: WARNING - worker stop commanded but thread did not exit gracefully in #{timeout} seconds"
  else
    self.thread.join
  end
  Qwirk.logger.info "#{self}: worker stop complete"
end

#log_backtrace(e) ⇒ Object

Allow override of backtrace logging in case the client doesn’t want to get spammed with it (maybe just config instead?)



169
170
171
# File 'lib/qwirk/worker.rb', line 169

def log_backtrace(e)
  Qwirk.logger.error "\t#{e.backtrace.join("\n\t")}"
end

#on_exception(e) ⇒ Object



216
217
218
219
220
221
222
223
# File 'lib/qwirk/worker.rb', line 216

def on_exception(e)
  Qwirk.logger.error "#{self}: Messaging Exception: #{e.message}"
  log_backtrace(e)
  @impl.handle_failure(message, e, @fail_queue_name) if @fail_queue_name
rescue Exception => e
  Qwirk.logger.error "#{self}: Exception in exception reply: #{e.message}"
  log_backtrace(e)
end

#on_message(message) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
# File 'lib/qwirk/worker.rb', line 204

def on_message(message)
  # TBD - Is it necessary to provide underlying message to worker?  Should we generically provide access to message attributes?  Do filters somehow fit in here?
  @message = message
  object = @impl.message_to_object(message)
  Qwirk.logger.debug {"#{self}: Received Object: #{object}"}
  perform(object)
rescue Exception => e
  on_exception(e)
ensure
  Qwirk.logger.debug {"#{self}: Finished processing message"}
end

#perform(object) ⇒ Object



160
161
162
# File 'lib/qwirk/worker.rb', line 160

def perform(object)
  raise "#{self}: Need to override perform method in #{self.class.name} in order to act on #{object}"
end

#startObject



124
125
126
127
128
129
130
# File 'lib/qwirk/worker.rb', line 124

def start
  @thread = Thread.new do
    java.lang.Thread.current_thread.name = "Qwirk worker: #{self}" if RUBY_PLATFORM == 'jruby'
    #Qwirk.logger.debug "#{worker}: Started thread with priority #{Thread.current.priority}"
    event_loop
  end
end

#stopObject

Workers will be starting and stopping on an as needed basis. Thus, when they get a stop command they should clean up any resources. We don’t want to clobber resources while a message is being processed so processing_mutex will surround message processessing and worker closing. From a JMS perspective, stop all workers (close consumer and session), stop the config. From an InMemory perspective, we don’t want the workers stopping until all messages in the queue have been processed. Therefore we want to stop the



138
139
140
141
142
143
144
145
146
# File 'lib/qwirk/worker.rb', line 138

def stop
  Qwirk.logger.debug "#{self}: In worker stop"
  @status  = 'Stopping'
  @stopped = true
  @processing_mutex.synchronize do
    # This should interrupt @impl.receive_message above and cause it to return nil
    @impl.stop
  end
end

#to_sObject



164
165
166
# File 'lib/qwirk/worker.rb', line 164

def to_s
  "#{config.name}:#{index}"
end