Module: QueueMap
Defined Under Namespace
Classes: Consumer
Constant Summary collapse
- BUNNY_MUTEX =
Mutex.new
- DEFAULT_ON_TIMEOUT =
lambda { |r| nil }
- VERSION =
"0.4"
Instance Attribute Summary collapse
-
#connection_info ⇒ Object
Returns the value of attribute connection_info.
- #consumer_base_path ⇒ Object
-
#consumer_path ⇒ Object
Returns the value of attribute consumer_path.
-
#mode ⇒ Object
Returns the value of attribute mode.
Instance Method Summary collapse
- #consumer(name) ⇒ Object
- #consumers ⇒ Object
- #map(collection, name, options = {}) ⇒ Object
- #queue_map_internal(collection, name, *args) ⇒ Object
- #response_queue_name(name) ⇒ Object
- #unique_name ⇒ Object
- #with_bunny(&block) ⇒ Object
Instance Attribute Details
#connection_info ⇒ Object
Returns the value of attribute connection_info.
12 13 14 |
# File 'lib/queue_map.rb', line 12 def connection_info @connection_info end |
#consumer_base_path ⇒ Object
20 21 22 |
# File 'lib/queue_map.rb', line 20 def consumer_base_path @consumer_base_path ||= "lib/consumers" end |
#consumer_path ⇒ Object
Returns the value of attribute consumer_path.
9 10 11 |
# File 'lib/queue_map.rb', line 9 def consumer_path @consumer_path end |
#mode ⇒ Object
Returns the value of attribute mode.
9 10 11 |
# File 'lib/queue_map.rb', line 9 def mode @mode end |
Instance Method Details
#consumer(name) ⇒ Object
73 74 75 |
# File 'lib/queue_map.rb', line 73 def consumer(name) consumers[name] ||= QueueMap::Consumer.from_file(consumer_path[name], :strategy => mode || :thread) end |
#consumers ⇒ Object
69 70 71 |
# File 'lib/queue_map.rb', line 69 def consumers @consumers ||= { } end |
#map(collection, name, options = {}) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/queue_map.rb', line 35 def map(collection, name, = {}) return queue_map_internal(collection, name) if mode == :test with_bunny do |bunny| q = bunny.queue(name.to_s) response_queue_name = response_queue_name(name) response_queue = bunny.queue(response_queue_name, :durable => false, :exclusive => true, :auto_delete => true) (0..(collection.length - 1)).each do |i| q.publish(Marshal.dump(:input => collection[i], :index => i, :response_queue => response_queue_name)) end results = {} begin Timeout.timeout([:timeout] || 5) do collection.length.times do sleep 0.05 while (next_response = response_queue.pop) == :queue_empty response = Marshal.load(next_response) results[response[:index]] = response[:result] end end rescue Timeout::Error => e end (0..(collection.length - 1)).map do |i| results[i] || ([:on_timeout] || DEFAULT_ON_TIMEOUT).call(collection[i]) end end end |
#queue_map_internal(collection, name, *args) ⇒ Object
65 66 67 |
# File 'lib/queue_map.rb', line 65 def queue_map_internal(collection, name, *args) collection.map(&consumer(name).worker_proc) end |
#response_queue_name(name) ⇒ Object
30 31 32 33 |
# File 'lib/queue_map.rb', line 30 def response_queue_name(name) @inc ||= 0 "#{name}_response_#{unique_name}_#{@inc += 1}" end |
#unique_name ⇒ Object
16 17 18 |
# File 'lib/queue_map.rb', line 16 def unique_name @unique_name ||= "#{`hostname`.chomp}-#{Process.pid}-#{Time.now.usec}" end |
#with_bunny(&block) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/queue_map.rb', line 77 def with_bunny(&block) bunny = nil BUNNY_MUTEX.synchronize do bunny = Bunny.new((@connection_info || { }).merge(:spec => '08')) bunny.start end begin yield bunny ensure (bunny.close_connection unless bunny.status == :not_connected) rescue nil end end |