Module: QueueMap

Extended by:
QueueMap
Included in:
QueueMap
Defined in:
lib/queue_map.rb,
lib/queue_map/version.rb

Defined Under Namespace

Classes: Consumer

Constant Summary collapse

BUNNY_MUTEX =
Mutex.new
DEFAULT_ON_TIMEOUT =
lambda { |r| nil }
VERSION =
"0.4"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connection_infoObject

Returns the value of attribute connection_info.



12
13
14
# File 'lib/queue_map.rb', line 12

def connection_info
  @connection_info
end

#consumer_base_pathObject



20
21
22
# File 'lib/queue_map.rb', line 20

def consumer_base_path
  @consumer_base_path ||= "lib/consumers"
end

#consumer_pathObject

Returns the value of attribute consumer_path.



9
10
11
# File 'lib/queue_map.rb', line 9

def consumer_path
  @consumer_path
end

#modeObject

Returns the value of attribute mode.



9
10
11
# File 'lib/queue_map.rb', line 9

def mode
  @mode
end

Instance Method Details

#consumer(name) ⇒ Object



73
74
75
# File 'lib/queue_map.rb', line 73

def consumer(name)
  consumers[name] ||= QueueMap::Consumer.from_file(consumer_path[name], :strategy => mode || :thread)
end

#consumersObject



69
70
71
# File 'lib/queue_map.rb', line 69

def consumers
  @consumers ||= { }
end

#map(collection, name, options = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/queue_map.rb', line 35

def map(collection, name, options = {})
  return queue_map_internal(collection, name) if mode == :test

  with_bunny do |bunny|
    q = bunny.queue(name.to_s)
    response_queue_name = response_queue_name(name)
    response_queue = bunny.queue(response_queue_name, :durable => false, :exclusive => true, :auto_delete => true)

    (0..(collection.length - 1)).each do |i|
      q.publish(Marshal.dump(:input => collection[i], :index => i, :response_queue => response_queue_name))
    end

    results = {}
    begin
      Timeout.timeout(options[:timeout] || 5) do
        collection.length.times do
          sleep 0.05 while (next_response = response_queue.pop) == :queue_empty
          response = Marshal.load(next_response)
          results[response[:index]] = response[:result]
        end
      end
    rescue Timeout::Error => e
    end

    (0..(collection.length - 1)).map do |i|
      results[i] || (options[:on_timeout] || DEFAULT_ON_TIMEOUT).call(collection[i])
    end
  end
end

#queue_map_internal(collection, name, *args) ⇒ Object



65
66
67
# File 'lib/queue_map.rb', line 65

def queue_map_internal(collection, name, *args)
  collection.map(&consumer(name).worker_proc)
end

#response_queue_name(name) ⇒ Object



30
31
32
33
# File 'lib/queue_map.rb', line 30

def response_queue_name(name)
  @inc ||= 0
  "#{name}_response_#{unique_name}_#{@inc += 1}"
end

#unique_nameObject



16
17
18
# File 'lib/queue_map.rb', line 16

def unique_name
  @unique_name ||= "#{`hostname`.chomp}-#{Process.pid}-#{Time.now.usec}"
end

#with_bunny(&block) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/queue_map.rb', line 77

def with_bunny(&block)
  bunny = nil
  BUNNY_MUTEX.synchronize do
    bunny = Bunny.new((@connection_info || { }).merge(:spec => '08'))
    bunny.start
  end
  begin
    yield bunny
  ensure
    (bunny.close_connection unless bunny.status == :not_connected) rescue nil
  end
end