27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 27
def start_subscribers!
subscriptions.each do |subscription|
route = subscription[:route]
queue = subscription[:queue]
queue.channel.prefetch = route.prefetch if route.acknowledgements?
threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
consumer = queue.subscribe(route.queue_subscription_options) do |metadata, encoded_payload|
::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
properties = {
:action => route.action,
:channel => queue.channel,
:content_type => metadata.content_type,
:delivery_tag => metadata.delivery_tag,
:exchange => metadata.exchange,
:headers => (metadata),
:message_id => metadata.message_id,
:routing_key => metadata.routing_key,
:queue => queue.name,
:uses_acknowledgements => route.acknowledgements?,
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
run_env(env, threadpool)
end
march_hare_consumers << consumer
end
end
|