Class: Pgq::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pgq/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(h) ⇒ Worker

Returns a new instance of Worker.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pgq/worker.rb', line 28

def initialize(h)
  @logger = h[:logger] || (defined?(Rails) && Rails.logger) || Logger.new(STDOUT)
  @consumers = []
  
  @queues = h[:queues]
  raise "Queue not selected" if @queues.blank?
  
  if @queues == ['all'] || @queues == 'all'
    if h[:queues_list]
      @queues = YAML.load_file(h[:queues_list])
    elsif defined?(Rails) && File.exists?(Rails.root + "config/queues_list.yml")
      @queues = YAML.load_file(Rails.root + "config/queues_list.yml")
    else
      raise "You shoud create config/queues_list.yml for all queues"
    end
  end
  
  @queues = @queues.split(',') if @queues.is_a?(String)
  
  @queues.each do |queue|
    klass = Pgq::Worker.predict_queue_class(queue)
    if klass
      @consumers << klass.new(@logger, queue)
    else
      raise "Unknown queue: #{queue}"
    end
  end

  @watch_file = h[:watch_file]
  @sleep_time = h[:sleep_time] || 0.5
end

Instance Attribute Details

#consumersObject (readonly)

Returns the value of attribute consumers.



4
5
6
# File 'lib/pgq/worker.rb', line 4

def consumers
  @consumers
end

#loggerObject (readonly)

Returns the value of attribute logger.



4
5
6
# File 'lib/pgq/worker.rb', line 4

def logger
  @logger
end

#queuesObject (readonly)

Returns the value of attribute queues.



4
5
6
# File 'lib/pgq/worker.rb', line 4

def queues
  @queues
end

#sleep_timeObject (readonly)

Returns the value of attribute sleep_time.



4
5
6
# File 'lib/pgq/worker.rb', line 4

def sleep_time
  @sleep_time
end

#watch_fileObject (readonly)

Returns the value of attribute watch_file.



4
5
6
# File 'lib/pgq/worker.rb', line 4

def watch_file
  @watch_file
end

Class Method Details

.connection(queue) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/pgq/worker.rb', line 19

def self.connection(queue)
  klass = predict_queue_class(queue)
  if klass
    klass.connection
  else
    raise "can't find klass for queue #{queue}"
  end
end

.predict_queue_class(queue) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/pgq/worker.rb', line 6

def self.predict_queue_class(queue)
  klass = nil
  unless klass
    queue.to_s.match(/([a-z_]+)/i)
    klass_s = $1.to_s
    klass_s.chop! if klass_s.size > 0 && klass_s[-1].chr == '_'
    klass_s = "pgq_" + klass_s unless klass_s.start_with?("pgq_")
    klass = klass_s.camelize.constantize rescue nil
    klass = nil unless klass.is_a?(Class)
  end    
  klass    
end

Instance Method Details

#process_batchObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/pgq/worker.rb', line 60

def process_batch
  process_count = 0

  @consumers.each do |consumer|
    process_count += consumer.perform_batch

    if @watch_file && File.exists?(@watch_file)
      logger.info "Found file #{@watch_file}, exiting!"
      File.unlink(@watch_file)
      return processed_count
    end
  end

  process_count
end

#runObject



76
77
78
79
80
81
82
83
# File 'lib/pgq/worker.rb', line 76

def run
  logger.info "Worker for (#{@queues.join(",")}) started"

  loop do
    processed_count = process_batch
    sleep(@sleep_time) if processed_count == 0
  end
end