Class: Pandemic::ClientSide::ClusterConnection
- Inherits:
-
Object
- Object
- Pandemic::ClientSide::ClusterConnection
- Includes:
- Util
- Defined in:
- lib/pandemic/client_side/cluster_connection.rb
Defined Under Namespace
Classes: LostConnectionToNode, NoNodesAvailable, NodeTimedOut, NotEnoughConnectionsTimeout, RequestFailed
Instance Method Summary collapse
- #[](key) ⇒ Object
-
#initialize ⇒ ClusterConnection
constructor
A new instance of ClusterConnection.
- #request(body, key = nil, options = {}) ⇒ Object
- #shutdown ⇒ Object
Methods included from Util
#host_port, #logger, #with_mutex
Constructor Details
#initialize ⇒ ClusterConnection
Returns a new instance of ClusterConnection.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/pandemic/client_side/cluster_connection.rb', line 11 def initialize Config.load @connections = [] @available = [] @grouped_connections = Hash.new { |hash, key| hash[key] = [] } @grouped_available = Hash.new { |hash, key| hash[key] = [] } @mutex = Monitor.new @connection_proxies = {} @queue = @mutex.new_cond # TODO: there should be a queue for each group @response_timeout = Config.response_timeout @response_timeout = nil if @response_timeout <= 0 Config.servers.each_with_index do |server_addr, key| @connection_proxies[key] = ConnectionProxy.new(key, self) host, port = host_port(server_addr) Config.min_connections_per_server.times do add_connection_for_key(key) end end maintain_minimum_connections! end |
Instance Method Details
#[](key) ⇒ Object
36 37 38 |
# File 'lib/pandemic/client_side/cluster_connection.rb', line 36 def [](key) @connection_proxies[key % @connection_proxies.size] end |
#request(body, key = nil, options = {}) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pandemic/client_side/cluster_connection.rb', line 40 def request(body, key = nil, = {}) key, = nil, key if key.is_a?(Hash) with_connection(key) do |socket| begin raise LostConnectionToNode if socket.nil? flags = [] if [:async] flags << "a" end flags = flags.empty? ? "" : " #{flags.join("")}" socket.write("#{body.size}#{flags}\n#{body}") socket.flush unless [:async] is_ready = IO.select([socket], nil, nil, @response_timeout) raise NodeTimedOut if is_ready.nil? response_size = socket.gets if response_size && response_size.to_i >= 0 socket.read(response_size.to_i) elsif response_size && response_size.to_i < 0 raise RequestFailed else # nil response size raise LostConnectionToNode end end rescue Errno::ECONNRESET, Errno::EPIPE raise LostConnectionToNode end end end |
#shutdown ⇒ Object
73 74 75 76 |
# File 'lib/pandemic/client_side/cluster_connection.rb', line 73 def shutdown @connections.each {|c| c.socket.close if c.alive? } @maintain_minimum_connections_thread.kill if @maintain_minimum_connections_thread end |