Class: QueueMap::Consumer
- Inherits:
-
Object
- Object
- QueueMap::Consumer
show all
- Defined in:
- lib/queue_map/consumer.rb
Defined Under Namespace
Modules: ForkStrategy
Classes: Configurator
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(name, options = { }) ⇒ Consumer
Returns a new instance of Consumer.
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/queue_map/consumer.rb', line 45
def initialize(name, options = { })
@name = name
case options[:strategy]
when :fork
extend(ForkStrategy)
when :test
nil
else
raise "Invalid strategy: #{options[:strategy]}"
end
end
|
Instance Attribute Details
#count_workers ⇒ Object
Returns the value of attribute count_workers.
2
3
4
|
# File 'lib/queue_map/consumer.rb', line 2
def count_workers
@count_workers
end
|
#idle_proc ⇒ Object
93
94
95
|
# File 'lib/queue_map/consumer.rb', line 93
def idle_proc
@idle_proc ||= lambda { sleep 0.05 }
end
|
#job_timeout ⇒ Object
Returns the value of attribute job_timeout.
2
3
4
|
# File 'lib/queue_map/consumer.rb', line 2
def job_timeout
@job_timeout
end
|
#log_file ⇒ Object
77
78
79
|
# File 'lib/queue_map/consumer.rb', line 77
def log_file
@log_file ||= "#{name}_consumer.log"
end
|
#master_pid ⇒ Object
Returns the value of attribute master_pid.
3
4
5
|
# File 'lib/queue_map/consumer.rb', line 3
def master_pid
@master_pid
end
|
#name ⇒ Object
Returns the value of attribute name.
3
4
5
|
# File 'lib/queue_map/consumer.rb', line 3
def name
@name
end
|
#on_exception_proc ⇒ Object
Returns the value of attribute on_exception_proc.
2
3
4
|
# File 'lib/queue_map/consumer.rb', line 2
def on_exception_proc
@on_exception_proc
end
|
#pid_file ⇒ Object
73
74
75
|
# File 'lib/queue_map/consumer.rb', line 73
def pid_file
@pid_file ||= "#{name}_consumer.pid"
end
|
#worker_proc ⇒ Object
Returns the value of attribute worker_proc.
2
3
4
|
# File 'lib/queue_map/consumer.rb', line 2
def worker_proc
@worker_proc
end
|
Class Method Details
.from_file(consumer_path, options = { }) ⇒ Object
38
39
40
41
42
43
|
# File 'lib/queue_map/consumer.rb', line 38
def self.from_file(consumer_path, options = { })
name = File.basename(consumer_path).gsub(/_consumer\.rb$/, '').to_sym
consumer = new(name, options)
Configurator.new(consumer).instance_eval(File.read(consumer_path), consumer_path, 1)
consumer
end
|
.new_from_block(name, options = { }, &block) ⇒ Object
32
33
34
35
36
|
# File 'lib/queue_map/consumer.rb', line 32
def self.new_from_block(name, options = { }, &block)
consumer = new(name, options)
Configurator.new(consumer).instance_eval(&block)
consumer
end
|
Instance Method Details
#after_fork_procs ⇒ Object
57
58
59
|
# File 'lib/queue_map/consumer.rb', line 57
def after_fork_procs
@after_fork_procs ||= []
end
|
#after_response_procs ⇒ Object
65
66
67
|
# File 'lib/queue_map/consumer.rb', line 65
def after_response_procs
@after_response_procs ||= []
end
|
#before_fork_procs ⇒ Object
61
62
63
|
# File 'lib/queue_map/consumer.rb', line 61
def before_fork_procs
@before_fork_procs ||= []
end
|
#before_job_procs ⇒ Object
69
70
71
|
# File 'lib/queue_map/consumer.rb', line 69
def before_job_procs
@before_job_procs ||= []
end
|
#logger ⇒ Object
81
82
83
|
# File 'lib/queue_map/consumer.rb', line 81
def logger
@logger ||= Logger.new(log_file)
end
|
#run_consumer ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# File 'lib/queue_map/consumer.rb', line 97
def run_consumer
begin
QueueMap.with_bunny do |bunny|
q = bunny.queue(name.to_s, :durable => false, :auto_delete => false, :ack => true)
logger.info "Process #{Process.pid} is listening on #{name.to_s}"
begin
msg = q.pop
(idle_proc.call; next) if msg == :queue_empty
before_job_procs.each { |p| p.call }
begin
Timeout.timeout(job_timeout) do
msg = Marshal.load(msg)
result = worker_proc.call(msg[:input])
bunny.queue(msg[:response_queue]).publish(Marshal.dump(:result => result, :index => msg[:index]))
end
ensure
after_response_procs.each { |p| p.call }
end
rescue Qrack::ClientTimeout
rescue Timeout::Error
logger.info "Job took longer than #{job_timeout} seconds to complete. Aborting"
end while ! @shutting_down
end
rescue Exception => e if on_exception_proc
on_exception_proc.call(e)
else
logger.info e.class
logger.error e.message
logger.error e.backtrace
end
sleep 0.2
end while ! @shutting_down
logger.info "Done."
end
|
#start ⇒ Object
85
86
87
|
# File 'lib/queue_map/consumer.rb', line 85
def start
raise RuntimeError, "Called start on Consumer without strategy"
end
|
#stop ⇒ Object
89
90
91
|
# File 'lib/queue_map/consumer.rb', line 89
def stop
raise RuntimeError, "Called stop on Consumer without strategy"
end
|