27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/action_subscriber/bunny/subscriber.rb', line 27
def start_subscribers!
subscriptions.each do |subscription|
route = subscription[:route]
queue = subscription[:queue]
channel = queue.channel
threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
channel.prefetch(route.prefetch) if route.acknowledgements?
consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?)
consumer.on_delivery do |delivery_info, properties, encoded_payload|
::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
properties = {
:action => route.action,
:channel => queue.channel,
:content_type => properties.content_type,
:delivery_tag => delivery_info.delivery_tag,
:exchange => delivery_info.exchange,
:headers => properties.,
:message_id => properties.message_id,
:routing_key => delivery_info.routing_key,
:queue => queue.name,
:uses_acknowledgements => route.acknowledgements?,
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
run_env(env, threadpool)
end
bunny_consumers << consumer
queue.subscribe_with(consumer)
end
end
|