Class: PredictionIO::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/predictionio/connection.rb

Overview

This class handles multithreading and asynchronous requests transparently for the REST client.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, threads = 1, timeout = 60) ⇒ Connection

Spawns a number of threads with persistent HTTP connection to the specified URI. Sets a default timeout of 60 seconds.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/predictionio/connection.rb', line 17

def initialize(uri, threads = 1, timeout = 60)
  @packages = Queue.new
  @counter_lock = Mutex.new
  @connections = 0
  @timeout = timeout
  threads.times do
    Thread.new do
      begin
        Net::HTTP.start(uri.host, uri.port,
                        :use_ssl => uri.scheme == 'https') do |http|
          @counter_lock.synchronize do
            @connections += 1
          end
          catch(:exit) do
            http.read_timeout = @timeout
            loop do
              package = @packages.pop
              request = package[:request]
              response = package[:response]
              case package[:method]
              when 'get'
                http_req = Net::HTTP::Get.new("#{uri.path}#{request.qpath}")
                begin
                  response.set(http.request(http_req))
                rescue Exception => details
                  response.set(details)
                end
              when 'post'
                if request.params.is_a?(Hash)
                  http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}")
                  http_req.set_form_data(request.params)
                else
                  http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}", initheader = {'Content-Type' => 'application/json'})
                  http_req.body = request.params
                end
                begin
                  response.set(http.request(http_req))
                rescue Exception => details
                  response.set(details)
                end
              when 'delete'
                http_req = Net::HTTP::Delete.new("#{uri.path}#{request.qpath}")
                begin
                  response.set(http.request(http_req))
                rescue Exception => details
                  response.set(details)
                end
              when 'exit'
                @counter_lock.synchronize do
                  @connections -= 1
                end
                throw :exit
              end
            end
          end
        end
      rescue Exception => detail
        @counter_lock.synchronize do
          if @connections == 0 then
            # Use non-blocking pop to avoid dead-locking the current
            # thread when there is no request, and give it a chance to re-connect.
            begin
              package = @packages.pop(true)
              response = package[:response]
              response.set(detail)
            rescue Exception
            end
          end
        end
        sleep(1)
        retry
      end
    end
  end
end

Instance Attribute Details

#connectionsObject (readonly)

Number of connections active



10
11
12
# File 'lib/predictionio/connection.rb', line 10

def connections
  @connections
end

#packagesObject (readonly)

Number of pending asynchronous request and response packages.



7
8
9
# File 'lib/predictionio/connection.rb', line 7

def packages
  @packages
end

#timeoutObject (readonly)

Timeout in seconds



13
14
15
# File 'lib/predictionio/connection.rb', line 13

def timeout
  @timeout
end

Instance Method Details

#adelete(areq) ⇒ Object

Shortcut to create an asynchronous DELETE request with the response object returned.



113
114
115
# File 'lib/predictionio/connection.rb', line 113

def adelete(areq)
  request('delete', areq)
end

#aget(areq) ⇒ Object

Shortcut to create an asynchronous GET request with the response object returned.



103
104
105
# File 'lib/predictionio/connection.rb', line 103

def aget(areq)
  request('get', areq)
end

#apost(areq) ⇒ Object

Shortcut to create an asynchronous POST request with the response object returned.



108
109
110
# File 'lib/predictionio/connection.rb', line 108

def apost(areq)
  request('post', areq)
end

#request(method, request) ⇒ Object

Create an asynchronous request and response package, put it in the pending queue, and return the response object.



94
95
96
97
98
99
100
# File 'lib/predictionio/connection.rb', line 94

def request(method, request)
  response = AsyncResponse.new(request)
  @packages.push(:method => method,
                 :request => request,
                 :response => response)
  response
end