Class: Dalli::PipelinedGetter

Inherits:
Object
  • Object
show all
Defined in:
lib/dalli/pipelined_getter.rb

Overview

Contains logic for the pipelined gets implemented by the client.

Instance Method Summary collapse

Constructor Details

#initialize(ring, key_manager) ⇒ PipelinedGetter

Returns a new instance of PipelinedGetter.



8
9
10
11
# File 'lib/dalli/pipelined_getter.rb', line 8

def initialize(ring, key_manager)
  @ring = ring
  @key_manager = key_manager
end

Instance Method Details

#abort_with_timeout(servers) ⇒ Object

Swallows Dalli::NetworkError



128
129
130
131
132
133
134
135
# File 'lib/dalli/pipelined_getter.rb', line 128

def abort_with_timeout(servers)
  abort_without_timeout(servers)
  servers.each do |server|
    Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" }
  end

  true # Required to simplify caller
end

#abort_without_timeout(servers) ⇒ Object

Swallows Dalli::NetworkError



91
92
93
# File 'lib/dalli/pipelined_getter.rb', line 91

def abort_without_timeout(servers)
  servers.each(&:pipeline_abort)
end

#fetch_responses(servers, start_time, timeout, &block) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/dalli/pipelined_getter.rb', line 95

def fetch_responses(servers, start_time, timeout, &block)
  # Remove any servers which are not connected
  servers.delete_if { |s| !s.connected? }
  return [] if servers.empty?

  time_left = remaining_time(start_time, timeout)
  readable_servers = servers_with_response(servers, time_left)
  if readable_servers.empty?
    abort_with_timeout(servers)
    return []
  end

  # Loop through the servers with responses, and
  # delete any from our list that are finished
  readable_servers.each do |server|
    servers.delete(server) if process_server(server, &block)
  end
  servers
rescue NetworkError
  # Abort and raise if we encountered a network error.  This triggers
  # a retry at the top level.
  abort_without_timeout(servers)
  raise
end

#finish_queries(servers) ⇒ Object

This loops through the servers that have keys in our set, sending the noop to terminate the set of queries.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/dalli/pipelined_getter.rb', line 59

def finish_queries(servers)
  deleted = []

  servers.each do |server|
    next unless server.alive?

    begin
      finish_query_for_server(server)
    rescue Dalli::NetworkError
      raise
    rescue Dalli::DalliError
      deleted.append(server)
    end
  end

  servers.delete_if { |server| deleted.include?(server) }
rescue Dalli::NetworkError
  abort_without_timeout(servers)
  raise
end

#finish_query_for_server(server) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/dalli/pipelined_getter.rb', line 80

def finish_query_for_server(server)
  server.pipeline_response_setup
rescue Dalli::NetworkError
  raise
rescue Dalli::DalliError => e
  Dalli.logger.debug { e.inspect }
  Dalli.logger.debug { "Results from server: #{server.name} will be missing from the results" }
  raise
end

#groups_for_keys(*keys) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/dalli/pipelined_getter.rb', line 164

def groups_for_keys(*keys)
  keys.flatten!
  keys.map! { |a| @key_manager.validate_key(a.to_s) }
  groups = @ring.keys_grouped_by_server(keys)
  if (unfound_keys = groups.delete(nil))
    Dalli.logger.debug do
      "unable to get keys for #{unfound_keys.length} keys " \
        'because no matching server was found'
    end
  end
  groups
end

#make_getkq_requests(groups) ⇒ Object

Loop through the server-grouped sets of keys, writing the corresponding getkq requests to the appropriate servers

It’s worth noting that we could potentially reduce bytes on the wire by switching from getkq to getq, and using the opaque value to match requests to responses.



46
47
48
49
50
51
52
53
# File 'lib/dalli/pipelined_getter.rb', line 46

def make_getkq_requests(groups)
  groups.each do |server, keys_for_server|
    server.request(:pipelined_get, keys_for_server)
  rescue DalliError, NetworkError => e
    Dalli.logger.debug { e.inspect }
    Dalli.logger.debug { "unable to get keys for server #{server.name}" }
  end
end

#process(keys, &block) ⇒ Object

Yields, one at a time, keys and their values+attributes.



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/dalli/pipelined_getter.rb', line 16

def process(keys, &block)
  return {} if keys.empty?

  @ring.lock do
    servers = setup_requests(keys)
    start_time = Time.now
    servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty?
  end
rescue NetworkError => e
  Dalli.logger.debug { e.inspect }
  Dalli.logger.debug { 'retrying pipelined gets because of timeout' }
  retry
end

#process_server(server) ⇒ Object

Processes responses from a server. Returns true if there are no additional responses from this server.



139
140
141
142
143
144
145
# File 'lib/dalli/pipelined_getter.rb', line 139

def process_server(server)
  server.pipeline_next_responses.each_pair do |key, value_list|
    yield @key_manager.key_without_namespace(key), value_list
  end

  server.pipeline_complete?
end

#remaining_time(start, timeout) ⇒ Object



120
121
122
123
124
125
# File 'lib/dalli/pipelined_getter.rb', line 120

def remaining_time(start, timeout)
  elapsed = Time.now - start
  return 0 if elapsed > timeout

  timeout - elapsed
end

#servers_with_response(servers, timeout) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/dalli/pipelined_getter.rb', line 147

def servers_with_response(servers, timeout)
  return [] if servers.empty?

  # TODO: - This is a bit challenging.  Essentially the PipelinedGetter
  # is a reactor, but without the benefit of a Fiber or separate thread.
  # My suspicion is that we may want to try and push this down into the
  # individual servers, but I'm not sure.  For now, we keep the
  # mapping between the alerted object (the socket) and the
  # corrresponding server here.
  server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

  readable, = IO.select(server_map.keys, nil, nil, timeout)
  return [] if readable.nil?

  readable.map { |sock| server_map[sock] }
end

#setup_requests(keys) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/dalli/pipelined_getter.rb', line 30

def setup_requests(keys)
  groups = groups_for_keys(keys)
  make_getkq_requests(groups)

  # TODO: How does this exit on a NetworkError
  finish_queries(groups.keys)
end