Class: PredictionIO::Connection
- Inherits:
-
Object
- Object
- PredictionIO::Connection
- Defined in:
- lib/predictionio/connection.rb
Overview
This class handles multithreading and asynchronous requests transparently for the REST client.
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Number of connections active.
-
#packages ⇒ Object
readonly
Number of pending asynchronous request and response packages.
-
#timeout ⇒ Object
readonly
Timeout in seconds.
Instance Method Summary collapse
-
#adelete(areq) ⇒ Object
Shortcut to create an asynchronous DELETE request with the response object returned.
-
#aget(areq) ⇒ Object
Shortcut to create an asynchronous GET request with the response object returned.
-
#apost(areq) ⇒ Object
Shortcut to create an asynchronous POST request with the response object returned.
-
#initialize(uri, threads = 1, timeout = 60) ⇒ Connection
constructor
Spawns a number of threads with persistent HTTP connection to the specified URI.
-
#request(method, request) ⇒ Object
Create an asynchronous request and response package, put it in the pending queue, and return the response object.
Constructor Details
#initialize(uri, threads = 1, timeout = 60) ⇒ Connection
Spawns a number of threads with persistent HTTP connection to the specified URI. Sets a default timeout of 60 seconds.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/predictionio/connection.rb', line 17 def initialize(uri, threads = 1, timeout = 60) @packages = Queue.new @counter_lock = Mutex.new @connections = 0 @timeout = timeout threads.times do Thread.new do begin Net::HTTP.start(uri.host, uri.port, :use_ssl => uri.scheme == 'https') do |http| @counter_lock.synchronize do @connections += 1 end catch(:exit) do http.read_timeout = @timeout loop do package = @packages.pop request = package[:request] response = package[:response] case package[:method] when 'get' http_req = Net::HTTP::Get.new("#{uri.path}#{request.qpath}") begin response.set(http.request(http_req)) rescue Exception => details response.set(details) end when 'post' if request.params.is_a?(Hash) http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}") http_req.set_form_data(request.params) else http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}", initheader = {'Content-Type' => 'application/json'}) http_req.body = request.params end begin response.set(http.request(http_req)) rescue Exception => details response.set(details) end when 'delete' http_req = Net::HTTP::Delete.new("#{uri.path}#{request.qpath}") begin response.set(http.request(http_req)) rescue Exception => details response.set(details) end when 'exit' @counter_lock.synchronize do @connections -= 1 end throw :exit end end end end rescue Exception => detail @counter_lock.synchronize do if @connections == 0 then # Use non-blocking pop to avoid dead-locking the current # thread when there is no request, and give it a chance to re-connect. begin package = @packages.pop(true) response = package[:response] response.set(detail) rescue Exception end end end sleep(1) retry end end end end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Number of connections active
10 11 12 |
# File 'lib/predictionio/connection.rb', line 10 def connections @connections end |
#packages ⇒ Object (readonly)
Number of pending asynchronous request and response packages.
7 8 9 |
# File 'lib/predictionio/connection.rb', line 7 def packages @packages end |
#timeout ⇒ Object (readonly)
Timeout in seconds
13 14 15 |
# File 'lib/predictionio/connection.rb', line 13 def timeout @timeout end |
Instance Method Details
#adelete(areq) ⇒ Object
Shortcut to create an asynchronous DELETE request with the response object returned.
113 114 115 |
# File 'lib/predictionio/connection.rb', line 113 def adelete(areq) request('delete', areq) end |
#aget(areq) ⇒ Object
Shortcut to create an asynchronous GET request with the response object returned.
103 104 105 |
# File 'lib/predictionio/connection.rb', line 103 def aget(areq) request('get', areq) end |
#apost(areq) ⇒ Object
Shortcut to create an asynchronous POST request with the response object returned.
108 109 110 |
# File 'lib/predictionio/connection.rb', line 108 def apost(areq) request('post', areq) end |
#request(method, request) ⇒ Object
Create an asynchronous request and response package, put it in the pending queue, and return the response object.
94 95 96 97 98 99 100 |
# File 'lib/predictionio/connection.rb', line 94 def request(method, request) response = AsyncResponse.new(request) @packages.push(:method => method, :request => request, :response => response) response end |