Module: ModernTimes::JMS::Worker

Includes:
Base::Worker
Included in:
RequestWorker
Defined in:
lib/modern_times/jms/worker.rb

Overview

Base Worker Class for any class that will be processing messages from topics or queues By default, it will consume messages from a queue with the class name minus the Worker postfix. For example, the queue call is unnecessary as it will default to a value of ‘Foo’ anyways:

class FooWorker < ModernTimes::JMS::Worker
  queue 'Foo'
  def perform(obj)
    # Perform work on obj
  end
end

A topic can be specified using virtual_topic as follows (ActiveMQ only). Multiple separate workers can subscribe to the same topic (under ActiveMQ - see activemq.apache.org/virtual-destinations.html):

class FooWorker < ModernTimes::JMS::Worker
  virtual_topic 'Zulu'
  def perform(obj)
    # Perform work on obj
  end
end

Filters can also be specified within the class:

class FooWorker < ModernTimes::JMS::Worker
  filter 'age > 30'
  def perform(obj)
    # Perform work on obj
  end
end

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Attributes included from Base::Worker

#index, #name, #options, #thread

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base::Worker

#join, #setup

Instance Attribute Details

#destination_optionsObject (readonly)

Returns the value of attribute destination_options.



37
38
39
# File 'lib/modern_times/jms/worker.rb', line 37

def destination_options
  @destination_options
end

#error_countObject (readonly)

Returns the value of attribute error_count.



37
38
39
# File 'lib/modern_times/jms/worker.rb', line 37

def error_count
  @error_count
end

#messageObject

Returns the value of attribute message.



38
39
40
# File 'lib/modern_times/jms/worker.rb', line 38

def message
  @message
end

#sessionObject (readonly)

Returns the value of attribute session.



37
38
39
# File 'lib/modern_times/jms/worker.rb', line 37

def session
  @session
end

#time_trackObject (readonly)

Returns the value of attribute time_track.



37
38
39
# File 'lib/modern_times/jms/worker.rb', line 37

def time_track
  @time_track
end

Class Method Details

.included(base) ⇒ Object



81
82
83
84
# File 'lib/modern_times/jms/worker.rb', line 81

def self.included(base)
  base.extend(ModernTimes::Base::Worker::ClassMethods)
  base.extend(ClassMethods)
end

Instance Method Details

#initialize(opts = {}) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/modern_times/jms/worker.rb', line 86

def initialize(opts={})
  super
  @stopped       = false
  @status        = 'initialized'
  @time_track    = ModernTimes::TimeTrack.new
  @error_count   = 0
  @message_mutex = Mutex.new

  @destination_options = self.class.dest_options.dup
  # Default the queue name to the Worker name if a destinations hasn't been specified
  if @destination_options.keys.select {|k| [:virtual_topic_name, :queue_name, :destination].include?(k)}.empty?
    @destination_options[:queue_name] = self.name
  end

  @real_destination_options = @destination_options.dup
  virtual_topic_name = @real_destination_options.delete(:virtual_topic_name)
  @real_destination_options[:queue_name] = "Consumer.#{name}.VirtualTopic.#{virtual_topic_name}" if virtual_topic_name

  target = opts[:fail_queue]
  target = self.class.fail_queue_target if target.nil?
  target = self.class.default_fail_queue_target if target.nil?
  if target == true
    @fail_queue_name = "#{name}Fail"
  elsif target == false
    @fail_queue_name = nil
  elsif target.kind_of?(String)
    @fail_queue_name = target
  else
    raise "Invalid fail queue: #{target}"
  end
end

#log_backtrace(e) ⇒ Object



170
171
172
# File 'lib/modern_times/jms/worker.rb', line 170

def log_backtrace(e)
  ModernTimes.logger.error "\t#{e.backtrace.join("\n\t")}"
end

#message_countObject



118
119
120
# File 'lib/modern_times/jms/worker.rb', line 118

def message_count
  @time_track.total_count
end

#perform(object) ⇒ Object



162
163
164
# File 'lib/modern_times/jms/worker.rb', line 162

def perform(object)
  raise "#{self}: Need to override perform method in #{self.class.name} in order to act on #{object}"
end

#startObject

Start the event loop for handling messages off the queue



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/modern_times/jms/worker.rb', line 127

def start
  @session = Connection.create_consumer_session
  @consumer = @session.consumer(@real_destination_options)
  @session.start

  ModernTimes.logger.debug "#{self}: Starting receive loop"
  @status = nil
  while !@stopped && msg = @consumer.receive
    @time_track.perform do
      @message_mutex.synchronize do
        on_message(msg)
        msg.acknowledge
      end
    end
    ModernTimes.logger.info {"#{self}::on_message (#{('%.1f' % (@time_track.last_time*1000.0))}ms)"} if ModernTimes::JMS::Connection.log_times?
    ModernTimes.logger.flush if ModernTimes.logger.respond_to?(:flush)
  end
  @status = 'Exited'
  ModernTimes.logger.info "#{self}: Exiting"
rescue Exception => e
  @status = "Exited with exception #{e.message}"
  ModernTimes.logger.error "#{self}: Exception, thread terminating: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
ensure
  ModernTimes.logger.flush if ModernTimes.logger.respond_to?(:flush)
end

#statusObject



122
123
124
# File 'lib/modern_times/jms/worker.rb', line 122

def status
  @status || "Processing message #{@time_track.total_count}"
end

#stopObject



153
154
155
156
157
158
159
160
# File 'lib/modern_times/jms/worker.rb', line 153

def stop
  @stopped = true
  # Don't clobber the session before a reply
  @message_mutex.synchronize do
    @consumer.close if @consumer
    @session.close if @session
  end
end

#to_sObject



166
167
168
# File 'lib/modern_times/jms/worker.rb', line 166

def to_s
  "#{@real_destination_options.to_a.join('=>')}:#{index}"
end