Class: QueueMap::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_map/consumer.rb

Defined Under Namespace

Modules: ForkStrategy, ThreadStrategy Classes: Configurator

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options = { }) ⇒ Consumer

Returns a new instance of Consumer.



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/queue_map/consumer.rb', line 42

def initialize(name, options = { })
  @name = name
  case options[:strategy]
  when :fork
    extend(ForkStrategy)
  when :thread
    extend(ThreadStrategy)
  when :test
    nil
  else
    raise "Invalid strategy: #{options[:strategy]}"
  end
end

Instance Attribute Details

#count_workersObject

Returns the value of attribute count_workers.



2
3
4
# File 'lib/queue_map/consumer.rb', line 2

def count_workers
  @count_workers
end

#log_fileObject



72
73
74
# File 'lib/queue_map/consumer.rb', line 72

def log_file
  @log_file ||= "#{name}_consumer.log"
end

#master_pidObject (readonly)

Returns the value of attribute master_pid.



3
4
5
# File 'lib/queue_map/consumer.rb', line 3

def master_pid
  @master_pid
end

#nameObject (readonly)

Returns the value of attribute name.



3
4
5
# File 'lib/queue_map/consumer.rb', line 3

def name
  @name
end

#on_exception_procObject

Returns the value of attribute on_exception_proc.



2
3
4
# File 'lib/queue_map/consumer.rb', line 2

def on_exception_proc
  @on_exception_proc
end

#pid_fileObject



68
69
70
# File 'lib/queue_map/consumer.rb', line 68

def pid_file
  @pid_file ||= "#{name}_consumer.pid"
end

#worker_procObject

Returns the value of attribute worker_proc.



2
3
4
# File 'lib/queue_map/consumer.rb', line 2

def worker_proc
  @worker_proc
end

Class Method Details

.from_file(consumer_path, options = { }) ⇒ Object



35
36
37
38
39
40
# File 'lib/queue_map/consumer.rb', line 35

def self.from_file(consumer_path, options = { })
  name = File.basename(consumer_path).gsub(/_consumer\.rb$/, '').to_sym
  consumer = new(name, options)
  Configurator.new(consumer).instance_eval(File.read(consumer_path), consumer_path, 1)
  consumer
end

.new_from_block(name, options = { }, &block) ⇒ Object



29
30
31
32
33
# File 'lib/queue_map/consumer.rb', line 29

def self.new_from_block(name, options = { }, &block)
  consumer = new(name, options)
  Configurator.new(consumer).instance_eval(&block)
  consumer
end

Instance Method Details

#after_fork_procsObject



56
57
58
# File 'lib/queue_map/consumer.rb', line 56

def after_fork_procs
  @after_fork_procs ||= []
end

#before_fork_procsObject



60
61
62
# File 'lib/queue_map/consumer.rb', line 60

def before_fork_procs
  @before_fork_procs ||= []
end

#between_responses_procsObject



64
65
66
# File 'lib/queue_map/consumer.rb', line 64

def between_responses_procs
  @between_responses_procs ||= []
end

#loggerObject



76
77
78
# File 'lib/queue_map/consumer.rb', line 76

def logger
  @logger ||= Logger.new(log_file)
end

#run_consumerObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/queue_map/consumer.rb', line 88

def run_consumer
  QueueMap.with_bunny do |bunny|
    q = bunny.queue(name.to_s, :durable => false, :auto_delete => true)
    while msg = q.pop
      # STDOUT << "[#{Thread.current[:id]}]"
      return if @shutting_down
      begin
        (sleep 0.05; next) if msg == :queue_empty
        msg = Marshal.load(msg)
        result = worker_proc.call(msg[:input])
        bunny.queue(msg[:response_queue]).publish(Marshal.dump(:result => result, :index => msg[:index]))
        between_responses_procs.each { |p| p.call }
      rescue Exception => e
        if on_exception_proc
          on_exception_proc.call(e)
        else
          logger.error e.message
          logger.error e.backtrace
        end
      end
    end
  end
end

#startObject

Raises:

  • (RuntimeError)


80
81
82
# File 'lib/queue_map/consumer.rb', line 80

def start
  raise RuntimeError, "Called start on Consumer without strategy"
end

#stopObject

Raises:

  • (RuntimeError)


84
85
86
# File 'lib/queue_map/consumer.rb', line 84

def stop
  raise RuntimeError, "Called stop on Consumer without strategy"
end