Module: RabbitJobs::Publisher
Instance Method Summary
collapse
amqp_with_exchange, amqp_with_queue, make_queue
Instance Method Details
#publish(klass, opts = {}, *params) ⇒ Object
12
13
14
15
|
# File 'lib/rabbit_jobs/publisher.rb', line 12
def publish(klass, opts = {}, *params)
key = RabbitJobs.config.routing_keys.first
publish_to(key, klass, opts, *params)
end
|
#publish_job_to(routing_key, job) ⇒ Object
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/rabbit_jobs/publisher.rb', line 27
def publish_job_to(routing_key, job)
amqp_with_exchange do |connection, exchange|
queue = make_queue(exchange, routing_key)
job.opts['created_at'] = Time.now.to_s
payload = job.payload
exchange.publish(job.payload, Configuration::DEFAULT_MESSAGE_PARAMS.merge({routing_key: routing_key})) {
connection.close { EM.stop }
}
end
end
|
#publish_to(routing_key, klass, opts = {}, *params) ⇒ Object
17
18
19
20
21
22
23
24
25
|
# File 'lib/rabbit_jobs/publisher.rb', line 17
def publish_to(routing_key, klass, opts = {}, *params)
raise ArgumentError unless klass && routing_key
opts ||= {}
job = klass.new(*params)
job.opts = opts
publish_job_to(routing_key, job)
end
|
#purge_queue(routing_key) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/rabbit_jobs/publisher.rb', line 41
def purge_queue(routing_key)
raise ArgumentError unless routing_key
amqp_with_queue(routing_key) do |connection, queue|
queue.status do |number_of_messages, number_of_consumers|
queue.purge {
connection.close {
EM.stop
return number_of_messages
}
}
end
end
end
|