Class: Dalli::Protocol::Meta::ResponseProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/dalli/protocol/response_processor.rb

Overview

Class that encapsulates logic for processing meta protocol responses from memcached. Includes logic for pulling data from an IO source and parsing into local values. Handles errors on unexpected values.

Constant Summary collapse

EN =
'EN'
END_TOKEN =
'END'
EX =
'EX'
HD =
'HD'
MN =
'MN'
NF =
'NF'
NS =
'NS'
OK =
'OK'
RESET =
'RESET'
STAT =
'STAT'
VA =
'VA'
VERSION =
'VERSION'
SERVER_ERROR =
'SERVER_ERROR'

Instance Method Summary collapse

Constructor Details

#initialize(io_source, value_marshaller) ⇒ ResponseProcessor

Returns a new instance of ResponseProcessor.



26
27
28
29
# File 'lib/dalli/protocol/response_processor.rb', line 26

def initialize(io_source, value_marshaller)
  @io_source = io_source
  @value_marshaller = value_marshaller
end

Instance Method Details

#bitflags_from_tokens(tokens) ⇒ Object



202
203
204
# File 'lib/dalli/protocol/response_processor.rb', line 202

def bitflags_from_tokens(tokens)
  value_from_tokens(tokens, 'f')&.to_i
end

#body_len_from_tokens(tokens) ⇒ Object



231
232
233
# File 'lib/dalli/protocol/response_processor.rb', line 231

def body_len_from_tokens(tokens)
  value_from_tokens(tokens, 's')&.to_i
end

#build_metadata_result(tokens) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/dalli/protocol/response_processor.rb', line 74

def (tokens)
  {
    value: nil, cas: cas_from_tokens(tokens),
    won_recache: tokens.include?('W'), stale: tokens.include?('X'),
    lost_recache: tokens.include?('Z')
  }
end

#cas_from_tokens(tokens) ⇒ Object



206
207
208
# File 'lib/dalli/protocol/response_processor.rb', line 206

def cas_from_tokens(tokens)
  value_from_tokens(tokens, 'c')&.to_i
end

#consume_all_responses_until_mnObject



143
144
145
146
147
148
# File 'lib/dalli/protocol/response_processor.rb', line 143

def consume_all_responses_until_mn
  tokens = next_line_to_tokens

  tokens = next_line_to_tokens while tokens.first != MN
  true
end

#decr_incrObject



108
109
110
111
112
113
114
# File 'lib/dalli/protocol/response_processor.rb', line 108

def decr_incr
  tokens = error_on_unexpected!([VA, NF, NS, EX])
  return false if [NS, EX].include?(tokens.first)
  return nil if tokens.first == NF

  read_line.to_i
end

#error_on_unexpected!(expected_codes) ⇒ Object

Raises:



192
193
194
195
196
197
198
199
200
# File 'lib/dalli/protocol/response_processor.rb', line 192

def error_on_unexpected!(expected_codes)
  tokens = next_line_to_tokens

  return tokens if expected_codes.include?(tokens.first)

  raise Dalli::ServerError, tokens.join(' ').to_s if tokens.first == SERVER_ERROR

  raise Dalli::DalliError, "Response error: #{tokens.first}"
end

#flushObject



126
127
128
129
130
# File 'lib/dalli/protocol/response_processor.rb', line 126

def flush
  error_on_unexpected!([OK])

  true
end

#full_response_from_buffer(tokens, body, resp_size) ⇒ Object



150
151
152
153
# File 'lib/dalli/protocol/response_processor.rb', line 150

def full_response_from_buffer(tokens, body, resp_size)
  value = @value_marshaller.retrieve(body, bitflags_from_tokens(tokens))
  [resp_size, tokens.first == VA, cas_from_tokens(tokens), key_from_tokens(tokens), value]
end

#getk_response_from_buffer(buf, offset = 0) ⇒ Object

This method returns an array of values used in a pipelined getk process. The first value is the number of bytes by which to advance the pointer in the buffer. If the complete response is found in the buffer, this will be the response size. Otherwise it is zero.

The remaining three values in the array are the ResponseHeader, key, and value.



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/dalli/protocol/response_processor.rb', line 165

def getk_response_from_buffer(buf, offset = 0)
  # Find the header terminator starting from offset
  term_idx = buf.index(TERMINATOR, offset)
  return [0, nil, nil, nil, nil] unless term_idx

  header = buf.byteslice(offset, term_idx - offset)
  tokens = header.split
  header_len = header.bytesize + TERMINATOR.length
  body_len = body_len_from_tokens(tokens)

  # We have a complete response that has no body.
  # This is either the response to the terminating
  # noop or, if the status is not MN, an intermediate
  # error response that needs to be discarded.
  return [header_len, true, nil, nil, nil] if body_len.zero?

  resp_size = header_len + body_len + TERMINATOR.length
  # The header is in the buffer, but the body is not.  As we don't have
  # a complete response, don't advance the buffer
  return [0, nil, nil, nil, nil] unless buf.bytesize >= offset + resp_size

  # The full response is in our buffer, so parse it and return
  # the values
  body = buf.byteslice(offset + header_len, body_len)
  full_response_from_buffer(tokens, body, resp_size)
end

#hit_status_from_tokens(tokens) ⇒ Object

Returns true if item was previously hit, false if first access, nil if not requested The h flag returns h0 (first access) or h1 (previously accessed)



218
219
220
221
222
223
# File 'lib/dalli/protocol/response_processor.rb', line 218

def hit_status_from_tokens(tokens)
  hit_token = tokens.find { |t| t.start_with?('h') && t.length == 2 }
  return nil unless hit_token

  hit_token[1] == '1'
end

#key_from_tokens(tokens) ⇒ Object



210
211
212
213
214
# File 'lib/dalli/protocol/response_processor.rb', line 210

def key_from_tokens(tokens)
  encoded_key = value_from_tokens(tokens, 'k')
  base64_encoded = tokens.any?('b')
  KeyRegularizer.decode(encoded_key, base64_encoded)
end

#last_access_from_tokens(tokens) ⇒ Object

Returns seconds since last access, or nil if not requested The l flag returns l<seconds>



227
228
229
# File 'lib/dalli/protocol/response_processor.rb', line 227

def last_access_from_tokens(tokens)
  value_from_tokens(tokens, 'l')&.to_i
end

#meta_deleteObject



103
104
105
106
# File 'lib/dalli/protocol/response_processor.rb', line 103

def meta_delete
  tokens = error_on_unexpected!([HD, NF, EX])
  tokens.first == HD
end

#meta_get_with_metadata(cache_nils: false, return_hit_status: false, return_last_access: false) ⇒ Object

Returns a hash with all requested metadata:

  • :value - the cached value (or nil if miss)

  • :cas - the CAS value (if return_cas was requested)

  • :won_recache - true if client won the right to recache (W flag)

  • :stale - true if the item is stale (X flag)

  • :lost_recache - true if another client is already recaching (Z flag)

  • :hit_before - true/false if item was previously accessed (h flag, if requested)

  • :last_access - seconds since last access (l flag, if requested)

Used by meta_get for comprehensive metadata retrieval. Supports thundering herd protection (N/R flags) and metadata flags (h/l/u).



65
66
67
68
69
70
71
72
# File 'lib/dalli/protocol/response_processor.rb', line 65

def (cache_nils: false, return_hit_status: false, return_last_access: false)
  tokens = error_on_unexpected!([VA, EN, HD])
  result = (tokens)
  result[:hit_before] = hit_status_from_tokens(tokens) if return_hit_status
  result[:last_access] = last_access_from_tokens(tokens) if return_last_access
  result[:value] = parse_value_from_tokens(tokens, cache_nils)
  result
end

#meta_get_with_value(cache_nils: false) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/dalli/protocol/response_processor.rb', line 31

def meta_get_with_value(cache_nils: false)
  tokens = error_on_unexpected!([VA, EN, HD])
  return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN
  return true unless tokens.first == VA

  @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens))
end

#meta_get_with_value_and_casObject



39
40
41
42
43
44
45
46
47
# File 'lib/dalli/protocol/response_processor.rb', line 39

def meta_get_with_value_and_cas
  tokens = error_on_unexpected!([VA, EN, HD])
  return [nil, 0] if tokens.first == EN

  cas = cas_from_tokens(tokens)
  return [nil, cas] unless tokens.first == VA

  [@value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)), cas]
end

#meta_get_without_valueObject



49
50
51
52
# File 'lib/dalli/protocol/response_processor.rb', line 49

def meta_get_without_value
  tokens = error_on_unexpected!([EN, HD])
  tokens.first == EN ? nil : true
end

#meta_set_append_prependObject



96
97
98
99
100
101
# File 'lib/dalli/protocol/response_processor.rb', line 96

def meta_set_append_prepend
  tokens = error_on_unexpected!([HD, NS, NF, EX])
  return false unless tokens.first == HD

  true
end

#meta_set_with_casObject



89
90
91
92
93
94
# File 'lib/dalli/protocol/response_processor.rb', line 89

def meta_set_with_cas
  tokens = error_on_unexpected!([HD, NS, NF, EX])
  return false unless tokens.first == HD

  cas_from_tokens(tokens)
end

#next_line_to_tokensObject



246
247
248
249
# File 'lib/dalli/protocol/response_processor.rb', line 246

def next_line_to_tokens
  line = read_line
  line&.split || []
end

#parse_value_from_tokens(tokens, cache_nils) ⇒ Object



82
83
84
85
86
87
# File 'lib/dalli/protocol/response_processor.rb', line 82

def parse_value_from_tokens(tokens, cache_nils)
  return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN
  return unless tokens.first == VA

  @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens))
end

#read_data(data_size) ⇒ Object



251
252
253
# File 'lib/dalli/protocol/response_processor.rb', line 251

def read_data(data_size)
  @io_source.read(data_size + TERMINATOR.bytesize)&.chomp!(TERMINATOR)
end

#read_lineObject



242
243
244
# File 'lib/dalli/protocol/response_processor.rb', line 242

def read_line
  @io_source.read_line&.chomp!(TERMINATOR)
end

#resetObject



132
133
134
135
136
# File 'lib/dalli/protocol/response_processor.rb', line 132

def reset
  error_on_unexpected!([RESET])

  true
end

#statsObject



116
117
118
119
120
121
122
123
124
# File 'lib/dalli/protocol/response_processor.rb', line 116

def stats
  tokens = error_on_unexpected!([END_TOKEN, STAT])
  values = {}
  while tokens.first != END_TOKEN
    values[tokens[1]] = tokens[2]
    tokens = next_line_to_tokens
  end
  values
end

#value_from_tokens(tokens, flag) ⇒ Object



235
236
237
238
239
240
# File 'lib/dalli/protocol/response_processor.rb', line 235

def value_from_tokens(tokens, flag)
  bitflags_token = tokens.find { |t| t.start_with?(flag) }
  return 0 unless bitflags_token

  bitflags_token[1..]
end

#versionObject



138
139
140
141
# File 'lib/dalli/protocol/response_processor.rb', line 138

def version
  tokens = error_on_unexpected!([VERSION])
  tokens.last
end