Class: QueueMap::Consumer
- Inherits:
-
Object
- Object
- QueueMap::Consumer
show all
- Defined in:
- lib/queue_map/consumer.rb
Defined Under Namespace
Modules: ForkStrategy, ThreadStrategy
Classes: Configurator
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(name, options = { }) ⇒ Consumer
Returns a new instance of Consumer.
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/queue_map/consumer.rb', line 42
def initialize(name, options = { })
@name = name
case options[:strategy]
when :fork
extend(ForkStrategy)
when :thread
extend(ThreadStrategy)
when :test
nil
else
raise "Invalid strategy: #{options[:strategy]}"
end
end
|
Instance Attribute Details
#count_workers ⇒ Object
Returns the value of attribute count_workers.
2
3
4
|
# File 'lib/queue_map/consumer.rb', line 2
def count_workers
@count_workers
end
|
#log_file ⇒ Object
72
73
74
|
# File 'lib/queue_map/consumer.rb', line 72
def log_file
@log_file ||= "#{name}_consumer.log"
end
|
#master_pid ⇒ Object
Returns the value of attribute master_pid.
3
4
5
|
# File 'lib/queue_map/consumer.rb', line 3
def master_pid
@master_pid
end
|
#name ⇒ Object
Returns the value of attribute name.
3
4
5
|
# File 'lib/queue_map/consumer.rb', line 3
def name
@name
end
|
#on_exception_proc ⇒ Object
Returns the value of attribute on_exception_proc.
2
3
4
|
# File 'lib/queue_map/consumer.rb', line 2
def on_exception_proc
@on_exception_proc
end
|
#pid_file ⇒ Object
68
69
70
|
# File 'lib/queue_map/consumer.rb', line 68
def pid_file
@pid_file ||= "#{name}_consumer.pid"
end
|
#worker_proc ⇒ Object
Returns the value of attribute worker_proc.
2
3
4
|
# File 'lib/queue_map/consumer.rb', line 2
def worker_proc
@worker_proc
end
|
Class Method Details
.from_file(consumer_path, options = { }) ⇒ Object
35
36
37
38
39
40
|
# File 'lib/queue_map/consumer.rb', line 35
def self.from_file(consumer_path, options = { })
name = File.basename(consumer_path).gsub(/_consumer\.rb$/, '').to_sym
consumer = new(name, options)
Configurator.new(consumer).instance_eval(File.read(consumer_path), consumer_path, 1)
consumer
end
|
.new_from_block(name, options = { }, &block) ⇒ Object
29
30
31
32
33
|
# File 'lib/queue_map/consumer.rb', line 29
def self.new_from_block(name, options = { }, &block)
consumer = new(name, options)
Configurator.new(consumer).instance_eval(&block)
consumer
end
|
Instance Method Details
#after_fork_procs ⇒ Object
56
57
58
|
# File 'lib/queue_map/consumer.rb', line 56
def after_fork_procs
@after_fork_procs ||= []
end
|
#before_fork_procs ⇒ Object
60
61
62
|
# File 'lib/queue_map/consumer.rb', line 60
def before_fork_procs
@before_fork_procs ||= []
end
|
#between_responses_procs ⇒ Object
64
65
66
|
# File 'lib/queue_map/consumer.rb', line 64
def between_responses_procs
@between_responses_procs ||= []
end
|
#logger ⇒ Object
76
77
78
|
# File 'lib/queue_map/consumer.rb', line 76
def logger
@logger ||= Logger.new(log_file)
end
|
#run_consumer ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/queue_map/consumer.rb', line 88
def run_consumer
QueueMap.with_bunny do |bunny|
q = bunny.queue(name.to_s, :durable => false, :auto_delete => true)
while msg = q.pop
return if @shutting_down
begin
(sleep 0.05; next) if msg == :queue_empty
msg = Marshal.load(msg)
result = worker_proc.call(msg[:input])
bunny.queue(msg[:response_queue]).publish(Marshal.dump(:result => result, :index => msg[:index]))
between_responses_procs.each { |p| p.call }
rescue Exception => e
if on_exception_proc
on_exception_proc.call(e)
else
logger.error e.message
logger.error e.backtrace
end
end
end
end
end
|
#start ⇒ Object
80
81
82
|
# File 'lib/queue_map/consumer.rb', line 80
def start
raise RuntimeError, "Called start on Consumer without strategy"
end
|
#stop ⇒ Object
84
85
86
|
# File 'lib/queue_map/consumer.rb', line 84
def stop
raise RuntimeError, "Called stop on Consumer without strategy"
end
|