Module: QueueMap
Defined Under Namespace
Classes: Consumer
Constant Summary collapse
- BUNNY_MUTEX =
Mutex.new
- DEFAULT_ON_TIMEOUT =
lambda { |r| nil }
- VERSION =
"0.7"
Instance Attribute Summary collapse
-
#connection_info ⇒ Object
Returns the value of attribute connection_info.
- #consumer_base_path ⇒ Object
-
#consumer_path ⇒ Object
Returns the value of attribute consumer_path.
-
#mode ⇒ Object
Returns the value of attribute mode.
Instance Method Summary collapse
- #consumer(name) ⇒ Object
- #consumers ⇒ Object
- #map(collection, name, options = {}) ⇒ Object
- #new_bunny_connection ⇒ Object
- #queue_map_internal(collection, name, *args) ⇒ Object
- #response_queue_name(name) ⇒ Object
- #unique_name ⇒ Object
- #with_bunny(&block) ⇒ Object
Instance Attribute Details
#connection_info ⇒ Object
Returns the value of attribute connection_info.
13 14 15 |
# File 'lib/queue_map.rb', line 13 def connection_info @connection_info end |
#consumer_base_path ⇒ Object
21 22 23 |
# File 'lib/queue_map.rb', line 21 def consumer_base_path @consumer_base_path ||= "lib/consumers" end |
#consumer_path ⇒ Object
Returns the value of attribute consumer_path.
10 11 12 |
# File 'lib/queue_map.rb', line 10 def consumer_path @consumer_path end |
#mode ⇒ Object
Returns the value of attribute mode.
10 11 12 |
# File 'lib/queue_map.rb', line 10 def mode @mode end |
Instance Method Details
#consumer(name) ⇒ Object
73 74 75 |
# File 'lib/queue_map.rb', line 73 def consumer(name) consumers[name] ||= QueueMap::Consumer.from_file(consumer_path[name], :strategy => mode || :fork) end |
#consumers ⇒ Object
69 70 71 |
# File 'lib/queue_map.rb', line 69 def consumers @consumers ||= { } end |
#map(collection, name, options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/queue_map.rb', line 36 def map(collection, name, = {}) return queue_map_internal(collection, name) if mode == :test with_bunny do |bunny| q = bunny.queue(name.to_s) response_queue_name = response_queue_name(name) response_queue = bunny.queue(response_queue_name, :durable => false, :exclusive => true, :auto_delete => true) (0..(collection.length - 1)).each do |i| q.publish(Marshal.dump(:input => collection[i], :index => i, :response_queue => response_queue_name)) end results = {} begin Timeout.timeout([:timeout] || 5) do response_queue.subscribe(:message_max => collection.length) do |msg| response = Marshal.load(msg) results[response[:index]] = response[:result] end end rescue Timeout::Error => e end (0..(collection.length - 1)).map do |i| results[i] || ([:on_timeout] || DEFAULT_ON_TIMEOUT).call(collection[i]) end end end |
#new_bunny_connection ⇒ Object
77 78 79 80 81 82 83 |
# File 'lib/queue_map.rb', line 77 def new_bunny_connection BUNNY_MUTEX.synchronize do bunny = Bunny.new((@connection_info || { }).merge(:spec => '08')) bunny.start bunny end end |
#queue_map_internal(collection, name, *args) ⇒ Object
65 66 67 |
# File 'lib/queue_map.rb', line 65 def queue_map_internal(collection, name, *args) collection.map(&consumer(name).worker_proc) end |
#response_queue_name(name) ⇒ Object
31 32 33 34 |
# File 'lib/queue_map.rb', line 31 def response_queue_name(name) @inc ||= 0 "#{name}_response_#{unique_name}_#{@inc += 1}" end |
#unique_name ⇒ Object
17 18 19 |
# File 'lib/queue_map.rb', line 17 def unique_name @unique_name ||= "#{`hostname`.chomp}-#{Process.pid}-#{Time.now.usec}" end |
#with_bunny(&block) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/queue_map.rb', line 85 def with_bunny(&block) bunny = new_bunny_connection begin yield bunny ensure BUNNY_MUTEX.synchronize do bunny.stop rescue nil bunny.close_connection rescue nil end end end |