11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/amqparty/request.rb', line 11
def perform(&block)
unless %w{amqp}.include? uri.scheme.to_s.downcase
raise UnsupportedURISchemeError
end
validate
setup_raw_request
chunked_body = nil
path = "#{uri.host}#{uri.path}"
path = "#{path}?#{uri.query}" if uri.query
connection_options = options[:amqp_client_options]
async = options[:async]
Rack::AMQP::Client.with_client(connection_options) do |client|
method_name = http_method.name.split(/::/).last.upcase
body = options[:body] || ""
if body.is_a?(Hash)
body = HTTParty::HashConversions.to_params(options[:body])
end
= options[:headers] || {}
timeout = options[:request_timeout]
response = client.request(path, {
body: body,
http_method: method_name,
headers: ,
timeout: timeout,
async: !!async
}
)
response_code = response.response_code
klass = Net::HTTPResponse.send(:response_class, response_code.to_s)
http_response = klass.new("1.1", response_code, "Found")
response..each_pair do |key, value|
http_response.add_field key, value
end
http_response.body = response.payload
http_response.send(:instance_eval, "def body; @body; end")
self.last_response = http_response
end
handle_deflation unless http_method == Net::HTTP::Head
handle_response(chunked_body, &block)
end
|