Class: Rack::AMQP::Client::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/rack/amqp/client/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(broker_connection_options) ⇒ Manager

Returns a new instance of Manager.



7
8
9
10
11
12
# File 'lib/rack/amqp/client/manager.rb', line 7

def initialize(broker_connection_options)
  connect!(broker_connection_options)
  @correlation_id = 0
  @incomplete_requests = []
  @mutex = Mutex.new
end

Instance Attribute Details

#amqp_channelObject

Returns the value of attribute amqp_channel.



5
6
7
# File 'lib/rack/amqp/client/manager.rb', line 5

def amqp_channel
  @amqp_channel
end

#amqp_clientObject

Returns the value of attribute amqp_client.



5
6
7
# File 'lib/rack/amqp/client/manager.rb', line 5

def amqp_client
  @amqp_client
end

#connection_optionsObject

Returns the value of attribute connection_options.



5
6
7
# File 'lib/rack/amqp/client/manager.rb', line 5

def connection_options
  @connection_options
end

Instance Method Details

#request(uri, options = {}) ⇒ Object

TODO this method needs to be refactored



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rack/amqp/client/manager.rb', line 15

def request(uri, options={})
  http_method = options[:http_method]
  timeout = options[:timeout] || 5

  body = options[:body] || ""
  headers = {
    'Content-Type' => 'application/x-www-form-urlencoded',
    'Content-Length' => body.length
  }.merge(options[:headers])

  request = Request.new((@correlation_id += 1).to_s, http_method, uri, body, headers)
  @mutex.synchronize { @incomplete_requests << request }
  callback_queue = create_callback_queue
  request.callback_queue = callback_queue

  amqp_channel.direct('').publish(request.payload, request.publishing_options)

  response = request.reply_wait(timeout)
  response
end