Class: Pgq::Worker
- Inherits:
-
Object
- Object
- Pgq::Worker
- Defined in:
- lib/pgq/worker.rb
Instance Attribute Summary collapse
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#sleep_time ⇒ Object
readonly
Returns the value of attribute sleep_time.
-
#watch_file ⇒ Object
readonly
Returns the value of attribute watch_file.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(h) ⇒ Worker
constructor
A new instance of Worker.
- #process_batch ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(h) ⇒ Worker
Returns a new instance of Worker.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pgq/worker.rb', line 28 def initialize(h) @logger = h[:logger] || (defined?(Rails) && Rails.logger) || Logger.new(STDOUT) @consumers = [] @queues = h[:queues] raise "Queue not selected" if @queues.blank? if @queues == ['all'] || @queues == 'all' if h[:queues_list] @queues = YAML.load_file(h[:queues_list]) elsif defined?(Rails) && File.exists?(Rails.root + "config/queues_list.yml") @queues = YAML.load_file(Rails.root + "config/queues_list.yml") else raise "You shoud create config/queues_list.yml for all queues" end end @queues = @queues.split(',') if @queues.is_a?(String) @queues.each do |queue| klass = Pgq::Worker.predict_queue_class(queue) if klass @consumers << klass.new(@logger, queue) else raise "Unknown queue: #{queue}" end end @watch_file = h[:watch_file] @sleep_time = h[:sleep_time] || 0.5 end |
Instance Attribute Details
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
4 5 6 |
# File 'lib/pgq/worker.rb', line 4 def consumers @consumers end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
4 5 6 |
# File 'lib/pgq/worker.rb', line 4 def logger @logger end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
4 5 6 |
# File 'lib/pgq/worker.rb', line 4 def queues @queues end |
#sleep_time ⇒ Object (readonly)
Returns the value of attribute sleep_time.
4 5 6 |
# File 'lib/pgq/worker.rb', line 4 def sleep_time @sleep_time end |
#watch_file ⇒ Object (readonly)
Returns the value of attribute watch_file.
4 5 6 |
# File 'lib/pgq/worker.rb', line 4 def watch_file @watch_file end |
Class Method Details
.connection(queue) ⇒ Object
19 20 21 22 23 24 25 26 |
# File 'lib/pgq/worker.rb', line 19 def self.connection(queue) klass = predict_queue_class(queue) if klass klass.connection else raise "can't find klass for queue #{queue}" end end |
.predict_queue_class(queue) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/pgq/worker.rb', line 6 def self.predict_queue_class(queue) klass = nil unless klass queue.to_s.match(/([a-z_]+)/i) klass_s = $1.to_s klass_s.chop! if klass_s.size > 0 && klass_s[-1].chr == '_' klass_s = "pgq_" + klass_s unless klass_s.start_with?("pgq_") klass = klass_s.camelize.constantize rescue nil klass = nil unless klass.is_a?(Class) end klass end |
Instance Method Details
#process_batch ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/pgq/worker.rb', line 60 def process_batch process_count = 0 @consumers.each do |consumer| process_count += consumer.perform_batch if @watch_file && File.exists?(@watch_file) logger.info "Found file #{@watch_file}, exiting!" File.unlink(@watch_file) return processed_count end end process_count end |
#run ⇒ Object
76 77 78 79 80 81 82 83 |
# File 'lib/pgq/worker.rb', line 76 def run logger.info "Worker for (#{@queues.join(",")}) started" loop do processed_count = process_batch sleep(@sleep_time) if processed_count == 0 end end |