Class: Dalli::PipelinedGetter
- Inherits:
-
Object
- Object
- Dalli::PipelinedGetter
- Defined in:
- lib/dalli/pipelined_getter.rb
Overview
Contains logic for the pipelined gets implemented by the client.
Instance Method Summary collapse
-
#abort_with_timeout(servers) ⇒ Object
Swallows Dalli::NetworkError.
-
#abort_without_timeout(servers) ⇒ Object
Swallows Dalli::NetworkError.
- #fetch_responses(servers, start_time, timeout, &block) ⇒ Object
-
#finish_queries(servers) ⇒ Object
This loops through the servers that have keys in our set, sending the noop to terminate the set of queries.
- #finish_query_for_server(server) ⇒ Object
- #groups_for_keys(*keys) ⇒ Object
-
#initialize(ring, key_manager) ⇒ PipelinedGetter
constructor
A new instance of PipelinedGetter.
-
#make_getkq_requests(groups) ⇒ Object
Loop through the server-grouped sets of keys, writing the corresponding getkq requests to the appropriate servers.
-
#process(keys, &block) ⇒ Object
Yields, one at a time, keys and their values+attributes.
-
#process_server(server) ⇒ Object
Processes responses from a server.
- #remaining_time(start, timeout) ⇒ Object
- #servers_with_response(servers, timeout) ⇒ Object
- #setup_requests(keys) ⇒ Object
Constructor Details
#initialize(ring, key_manager) ⇒ PipelinedGetter
Returns a new instance of PipelinedGetter.
8 9 10 11 |
# File 'lib/dalli/pipelined_getter.rb', line 8 def initialize(ring, key_manager) @ring = ring @key_manager = key_manager end |
Instance Method Details
#abort_with_timeout(servers) ⇒ Object
Swallows Dalli::NetworkError
128 129 130 131 132 133 134 135 |
# File 'lib/dalli/pipelined_getter.rb', line 128 def abort_with_timeout(servers) abort_without_timeout(servers) servers.each do |server| Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" } end true # Required to simplify caller end |
#abort_without_timeout(servers) ⇒ Object
Swallows Dalli::NetworkError
91 92 93 |
# File 'lib/dalli/pipelined_getter.rb', line 91 def abort_without_timeout(servers) servers.each(&:pipeline_abort) end |
#fetch_responses(servers, start_time, timeout, &block) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/dalli/pipelined_getter.rb', line 95 def fetch_responses(servers, start_time, timeout, &block) # Remove any servers which are not connected servers.delete_if { |s| !s.connected? } return [] if servers.empty? time_left = remaining_time(start_time, timeout) readable_servers = servers_with_response(servers, time_left) if readable_servers.empty? abort_with_timeout(servers) return [] end # Loop through the servers with responses, and # delete any from our list that are finished readable_servers.each do |server| servers.delete(server) if process_server(server, &block) end servers rescue NetworkError # Abort and raise if we encountered a network error. This triggers # a retry at the top level. abort_without_timeout(servers) raise end |
#finish_queries(servers) ⇒ Object
This loops through the servers that have keys in our set, sending the noop to terminate the set of queries.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/dalli/pipelined_getter.rb', line 59 def finish_queries(servers) deleted = [] servers.each do |server| next unless server.connected? begin finish_query_for_server(server) rescue Dalli::NetworkError raise rescue Dalli::DalliError deleted.append(server) end end servers.delete_if { |server| deleted.include?(server) } rescue Dalli::NetworkError abort_without_timeout(servers) raise end |
#finish_query_for_server(server) ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'lib/dalli/pipelined_getter.rb', line 80 def finish_query_for_server(server) server.pipeline_response_setup rescue Dalli::NetworkError raise rescue Dalli::DalliError => e Dalli.logger.debug { e.inspect } Dalli.logger.debug { "Results from server: #{server.name} will be missing from the results" } raise end |
#groups_for_keys(*keys) ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/dalli/pipelined_getter.rb', line 164 def groups_for_keys(*keys) keys.flatten! keys.map! { |a| @key_manager.validate_key(a.to_s) } groups = @ring.keys_grouped_by_server(keys) if (unfound_keys = groups.delete(nil)) Dalli.logger.debug do "unable to get keys for #{unfound_keys.length} keys " \ 'because no matching server was found' end end groups end |
#make_getkq_requests(groups) ⇒ Object
Loop through the server-grouped sets of keys, writing the corresponding getkq requests to the appropriate servers
It’s worth noting that we could potentially reduce bytes on the wire by switching from getkq to getq, and using the opaque value to match requests to responses.
46 47 48 49 50 51 52 53 |
# File 'lib/dalli/pipelined_getter.rb', line 46 def make_getkq_requests(groups) groups.each do |server, keys_for_server| server.request(:pipelined_get, keys_for_server) rescue DalliError, NetworkError => e Dalli.logger.debug { e.inspect } Dalli.logger.debug { "unable to get keys for server #{server.name}" } end end |
#process(keys, &block) ⇒ Object
Yields, one at a time, keys and their values+attributes.
16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/dalli/pipelined_getter.rb', line 16 def process(keys, &block) return {} if keys.empty? @ring.lock do servers = setup_requests(keys) start_time = Time.now servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty? end rescue NetworkError => e Dalli.logger.debug { e.inspect } Dalli.logger.debug { 'retrying pipelined gets because of timeout' } retry end |
#process_server(server) ⇒ Object
Processes responses from a server. Returns true if there are no additional responses from this server.
139 140 141 142 143 144 145 |
# File 'lib/dalli/pipelined_getter.rb', line 139 def process_server(server) server.pipeline_next_responses.each_pair do |key, value_list| yield @key_manager.key_without_namespace(key), value_list end server.pipeline_complete? end |
#remaining_time(start, timeout) ⇒ Object
120 121 122 123 124 125 |
# File 'lib/dalli/pipelined_getter.rb', line 120 def remaining_time(start, timeout) elapsed = Time.now - start return 0 if elapsed > timeout timeout - elapsed end |
#servers_with_response(servers, timeout) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/dalli/pipelined_getter.rb', line 147 def servers_with_response(servers, timeout) return [] if servers.empty? # TODO: - This is a bit challenging. Essentially the PipelinedGetter # is a reactor, but without the benefit of a Fiber or separate thread. # My suspicion is that we may want to try and push this down into the # individual servers, but I'm not sure. For now, we keep the # mapping between the alerted object (the socket) and the # corrresponding server here. server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } readable, = IO.select(server_map.keys, nil, nil, timeout) return [] if readable.nil? readable.map { |sock| server_map[sock] } end |
#setup_requests(keys) ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/dalli/pipelined_getter.rb', line 30 def setup_requests(keys) groups = groups_for_keys(keys) make_getkq_requests(groups) # TODO: How does this exit on a NetworkError finish_queries(groups.keys) end |