Class: Dalli::Protocol::Meta::ResponseProcessor
- Inherits:
-
Object
- Object
- Dalli::Protocol::Meta::ResponseProcessor
- Defined in:
- lib/dalli/protocol/meta/response_processor.rb
Overview
Class that encapsulates logic for processing meta protocol responses from memcached. Includes logic for pulling data from an IO source and parsing into local values. Handles errors on unexpected values.
Constant Summary collapse
- EN =
'EN'
- END_TOKEN =
'END'
- EX =
'EX'
- HD =
'HD'
- MN =
'MN'
- NF =
'NF'
- NS =
'NS'
- OK =
'OK'
- RESET =
'RESET'
- STAT =
'STAT'
- VA =
'VA'
- VERSION =
'VERSION'
Instance Method Summary collapse
- #bitflags_from_tokens(tokens) ⇒ Object
- #body_len_from_tokens(tokens) ⇒ Object
- #cas_from_tokens(tokens) ⇒ Object
- #consume_all_responses_until_mn ⇒ Object
- #contains_header?(buf) ⇒ Boolean
- #decr_incr ⇒ Object
- #error_on_unexpected!(expected_codes) ⇒ Object
- #flush ⇒ Object
- #full_response_from_buffer(tokens, body, resp_size) ⇒ Object
-
#getk_response_from_buffer(buf) ⇒ Object
This method returns an array of values used in a pipelined getk process.
- #header_from_buffer(buf) ⇒ Object
-
#initialize(io_source, value_marshaller) ⇒ ResponseProcessor
constructor
A new instance of ResponseProcessor.
- #key_from_tokens(tokens) ⇒ Object
- #meta_delete ⇒ Object
- #meta_get_with_value(cache_nils: false) ⇒ Object
- #meta_get_with_value_and_cas ⇒ Object
- #meta_get_without_value ⇒ Object
- #meta_set_append_prepend ⇒ Object
- #meta_set_with_cas ⇒ Object
- #next_line_to_tokens ⇒ Object
- #read_line ⇒ Object
- #reset ⇒ Object
- #stats ⇒ Object
- #tokens_from_header_buffer(buf) ⇒ Object
- #value_from_tokens(tokens, flag) ⇒ Object
- #version ⇒ Object
Constructor Details
#initialize(io_source, value_marshaller) ⇒ ResponseProcessor
Returns a new instance of ResponseProcessor.
25 26 27 28 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 25 def initialize(io_source, value_marshaller) @io_source = io_source @value_marshaller = value_marshaller end |
Instance Method Details
#bitflags_from_tokens(tokens) ⇒ Object
175 176 177 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 175 def bitflags_from_tokens(tokens) value_from_tokens(tokens, 'f')&.to_i end |
#body_len_from_tokens(tokens) ⇒ Object
189 190 191 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 189 def body_len_from_tokens(tokens) value_from_tokens(tokens, 's')&.to_i end |
#cas_from_tokens(tokens) ⇒ Object
179 180 181 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 179 def cas_from_tokens(tokens) value_from_tokens(tokens, 'c')&.to_i end |
#consume_all_responses_until_mn ⇒ Object
107 108 109 110 111 112 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 107 def consume_all_responses_until_mn tokens = next_line_to_tokens tokens = next_line_to_tokens while tokens.first != MN true end |
#contains_header?(buf) ⇒ Boolean
160 161 162 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 160 def contains_header?(buf) buf.include?(TERMINATOR) end |
#decr_incr ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 72 def decr_incr tokens = error_on_unexpected!([VA, NF, NS, EX]) return false if [NS, EX].include?(tokens.first) return nil if tokens.first == NF read_line.to_i end |
#error_on_unexpected!(expected_codes) ⇒ Object
168 169 170 171 172 173 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 168 def error_on_unexpected!(expected_codes) tokens = next_line_to_tokens raise Dalli::DalliError, "Response error: #{tokens.first}" unless expected_codes.include?(tokens.first) tokens end |
#flush ⇒ Object
90 91 92 93 94 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 90 def flush error_on_unexpected!([OK]) true end |
#full_response_from_buffer(tokens, body, resp_size) ⇒ Object
122 123 124 125 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 122 def full_response_from_buffer(tokens, body, resp_size) value = @value_marshaller.retrieve(body, bitflags_from_tokens(tokens)) [resp_size, tokens.first == VA, cas_from_tokens(tokens), key_from_tokens(tokens), value] end |
#getk_response_from_buffer(buf) ⇒ Object
This method returns an array of values used in a pipelined getk process. The first value is the number of bytes by which to advance the pointer in the buffer. If the complete response is found in the buffer, this will be the response size. Otherwise it is zero.
The remaining three values in the array are the ResponseHeader, key, and value.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 137 def getk_response_from_buffer(buf) # There's no header in the buffer, so don't advance return [0, nil, nil, nil, nil] unless contains_header?(buf) tokens, header_len, body_len = tokens_from_header_buffer(buf) # We have a complete response that has no body. # This is either the response to the terminating # noop or, if the status is not MN, an intermediate # error response that needs to be discarded. return [header_len, true, nil, nil, nil] if body_len.zero? resp_size = header_len + body_len + TERMINATOR.length # The header is in the buffer, but the body is not. As we don't have # a complete response, don't advance the buffer return [0, nil, nil, nil, nil] unless buf.bytesize >= resp_size # The full response is in our buffer, so parse it and return # the values body = buf.slice(header_len, body_len) full_response_from_buffer(tokens, body, resp_size) end |
#header_from_buffer(buf) ⇒ Object
164 165 166 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 164 def header_from_buffer(buf) buf.split(TERMINATOR, 2).first end |
#key_from_tokens(tokens) ⇒ Object
183 184 185 186 187 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 183 def key_from_tokens(tokens) encoded_key = value_from_tokens(tokens, 'k') base64_encoded = tokens.any?('b') KeyRegularizer.decode(encoded_key, base64_encoded) end |
#meta_delete ⇒ Object
67 68 69 70 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 67 def tokens = error_on_unexpected!([HD, NF, EX]) tokens.first == HD end |
#meta_get_with_value(cache_nils: false) ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 30 def (cache_nils: false) tokens = error_on_unexpected!([VA, EN, HD]) return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN return true unless tokens.first == VA @value_marshaller.retrieve(read_line, bitflags_from_tokens(tokens)) end |
#meta_get_with_value_and_cas ⇒ Object
38 39 40 41 42 43 44 45 46 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 38 def tokens = error_on_unexpected!([VA, EN, HD]) return [nil, 0] if tokens.first == EN cas = cas_from_tokens(tokens) return [nil, cas] unless tokens.first == VA [@value_marshaller.retrieve(read_line, bitflags_from_tokens(tokens)), cas] end |
#meta_get_without_value ⇒ Object
48 49 50 51 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 48 def tokens = error_on_unexpected!([EN, HD]) tokens.first == EN ? nil : true end |
#meta_set_append_prepend ⇒ Object
60 61 62 63 64 65 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 60 def tokens = error_on_unexpected!([HD, NS, NF, EX]) return false unless tokens.first == HD true end |
#meta_set_with_cas ⇒ Object
53 54 55 56 57 58 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 53 def tokens = error_on_unexpected!([HD, NS, NF, EX]) return false unless tokens.first == HD cas_from_tokens(tokens) end |
#next_line_to_tokens ⇒ Object
204 205 206 207 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 204 def next_line_to_tokens line = read_line line&.split || [] end |
#read_line ⇒ Object
200 201 202 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 200 def read_line @io_source.read_line&.chomp!(TERMINATOR) end |
#reset ⇒ Object
96 97 98 99 100 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 96 def reset error_on_unexpected!([RESET]) true end |
#stats ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 80 def stats tokens = error_on_unexpected!([END_TOKEN, STAT]) values = {} while tokens.first != END_TOKEN values[tokens[1]] = tokens[2] tokens = next_line_to_tokens end values end |
#tokens_from_header_buffer(buf) ⇒ Object
114 115 116 117 118 119 120 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 114 def tokens_from_header_buffer(buf) header = header_from_buffer(buf) tokens = header.split header_len = header.bytesize + TERMINATOR.length body_len = body_len_from_tokens(tokens) [tokens, header_len, body_len] end |
#value_from_tokens(tokens, flag) ⇒ Object
193 194 195 196 197 198 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 193 def value_from_tokens(tokens, flag) bitflags_token = tokens.find { |t| t.start_with?(flag) } return 0 unless bitflags_token bitflags_token[1..] end |
#version ⇒ Object
102 103 104 105 |
# File 'lib/dalli/protocol/meta/response_processor.rb', line 102 def version tokens = error_on_unexpected!([VERSION]) tokens.last end |