Class: Pandemic::ClientSide::ClusterConnection

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/pandemic/client_side/cluster_connection.rb

Defined Under Namespace

Classes: LostConnectionToNode, NoNodesAvailable, NodeTimedOut, NotEnoughConnectionsTimeout, RequestFailed

Instance Method Summary collapse

Methods included from Util

#host_port, #logger, #with_mutex

Constructor Details

#initializeClusterConnection

Returns a new instance of ClusterConnection.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/pandemic/client_side/cluster_connection.rb', line 11

def initialize
  Config.load
  @connections = []
  @available = []
  @grouped_connections = Hash.new { |hash, key| hash[key] = [] }
  @grouped_available = Hash.new { |hash, key| hash[key] = [] }
  @mutex = Monitor.new
  @connection_proxies = {}
  @queue = @mutex.new_cond # TODO: there should be a queue for each group
  
  @response_timeout = Config.response_timeout
  @response_timeout = nil if @response_timeout <= 0
  
  Config.servers.each_with_index do |server_addr, key|
    @connection_proxies[key] = ConnectionProxy.new(key, self)
    host, port = host_port(server_addr)
    Config.min_connections_per_server.times do
      add_connection_for_key(key)
    end
  end
  
  maintain_minimum_connections!
end

Instance Method Details

#[](key) ⇒ Object



36
37
38
# File 'lib/pandemic/client_side/cluster_connection.rb', line 36

def [](key)
  @connection_proxies[key % @connection_proxies.size]
end

#request(body, key = nil, options = {}) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pandemic/client_side/cluster_connection.rb', line 40

def request(body, key = nil, options = {})
  key, options = nil, key if key.is_a?(Hash)
  with_connection(key) do |socket|
    begin
      raise LostConnectionToNode if socket.nil?
      flags = []
      if options[:async]
        flags << "a"
      end
      flags = flags.empty? ? "" : " #{flags.join("")}"
      
      socket.write("#{body.size}#{flags}\n#{body}")
      socket.flush
      
      unless options[:async]
        is_ready = IO.select([socket], nil, nil, @response_timeout)
        raise NodeTimedOut if is_ready.nil?
        response_size = socket.gets
        if response_size && response_size.to_i >= 0
          socket.read(response_size.to_i)
        elsif response_size && response_size.to_i < 0
          raise RequestFailed
        else
          # nil response size
          raise LostConnectionToNode
        end
      end
    rescue Errno::ECONNRESET, Errno::EPIPE
      raise LostConnectionToNode
    end
  end
end

#shutdownObject



73
74
75
76
# File 'lib/pandemic/client_side/cluster_connection.rb', line 73

def shutdown
  @connections.each {|c| c.socket.close if c.alive? }
  @maintain_minimum_connections_thread.kill if @maintain_minimum_connections_thread
end