Class: QProcessor::Processor
- Inherits:
-
Object
- Object
- QProcessor::Processor
- Defined in:
- lib/qprocessor/processor.rb
Overview
This class encapsulates a lot of the functionality around creating a queue processor that pulls work from a queue and passes it to an instance of a specified class for ‘handling’.
Constant Summary collapse
- MAX_ERROR_RESTART_INTERVAL =
A constant containing the maximum permitted setting for the inter-error sleep interval.
32
Instance Attribute Summary collapse
-
#class_name ⇒ Object
readonly
Returns the value of attribute class_name.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue_url ⇒ Object
readonly
Returns the value of attribute queue_url.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Instance Method Summary collapse
- #handle(message) ⇒ Object
-
#initialize(processor, settings = {}) ⇒ Processor
constructor
Constructor for the Processor class.
-
#start ⇒ Object
Starts processing of the queue.
Constructor Details
#initialize(processor, settings = {}) ⇒ Processor
Constructor for the Processor class.
20 21 22 23 24 25 26 |
# File 'lib/qprocessor/processor.rb', line 20 def initialize(processor, settings={}) @instance = nil @name = processor.name @processor_class = processor @settings = {}.merge(settings) @terminate = false end |
Instance Attribute Details
#class_name ⇒ Object (readonly)
Returns the value of attribute class_name.
27 28 29 |
# File 'lib/qprocessor/processor.rb', line 27 def class_name @class_name end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
27 28 29 |
# File 'lib/qprocessor/processor.rb', line 27 def name @name end |
#queue_url ⇒ Object (readonly)
Returns the value of attribute queue_url.
27 28 29 |
# File 'lib/qprocessor/processor.rb', line 27 def queue_url @queue_url end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
27 28 29 |
# File 'lib/qprocessor/processor.rb', line 27 def settings @settings end |
Instance Method Details
#handle(message) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/qprocessor/processor.rb', line 48 def handle() begin logger.debug "The '#{name}' queue processor received message id #{.id}." process() .dispose interval = 0 rescue => error .release raise end end |
#start ⇒ Object
Starts processing of the queue. This method will not return to the caller as it will run a loop processing the queue jobs as they become available.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/qprocessor/processor.rb', line 31 def start queue = QProcessor::Sources.manufacture! interval = 0 while !@terminate begin logger.debug("The '#{name}' queue processor is listening for jobs.") queue.get {|| handle()} rescue => error logger.error "The '#{name}' queue processor caught an exception.\nType: #{error.class.name}\n"\ "Message: #{error}\nStack Trace:\n#{error.backtrace.join("\n")}" sleep(interval) if interval > 0 interval = interval > 0 ? interval * 2 : 1 interval = MAX_ERROR_RESTART_INTERVAL if interval > MAX_ERROR_RESTART_INTERVAL end end end |