Class: Lumberjack::Beats::Parser
- Inherits:
-
Object
- Object
- Lumberjack::Beats::Parser
- Defined in:
- lib/lumberjack/beats/server.rb
Overview
class Server
Constant Summary collapse
- PROTOCOL_VERSION_1 =
"1".ord
- PROTOCOL_VERSION_2 =
"2".ord
- SUPPORTED_PROTOCOLS =
[PROTOCOL_VERSION_1, PROTOCOL_VERSION_2]
- FRAME_WINDOW =
def need
"W".ord
- FRAME_DATA =
"D".ord
- FRAME_JSON_DATA =
"J".ord
- FRAME_COMPRESSED =
"C".ord
Instance Method Summary collapse
-
#compressed_lead(&block) ⇒ Object
def data_field_value.
- #compressed_payload(&block) ⇒ Object
- #data_field_key(&block) ⇒ Object
- #data_field_key_len(&block) ⇒ Object
- #data_field_value(&block) ⇒ Object
- #data_field_value_len(&block) ⇒ Object
- #data_lead(&block) ⇒ Object
-
#feed(data, &block) ⇒ String?
Feed data to this parser.
-
#get(length = nil) ⇒ Object
Get ‘length’ string from the buffer.
- #handle_version(version, &block) ⇒ Object
-
#have?(length) ⇒ Boolean
Do we have at least ‘length’ bytes in the buffer?.
- #header(&block) ⇒ Object
-
#initialize ⇒ Parser
constructor
A new instance of Parser.
-
#json_data_lead(&block) ⇒ Object
def window_size.
- #json_data_payload {|:json, @sequence, Lumberjack::Beats::json.load(payload)| ... } ⇒ Object
-
#need(length) ⇒ Object
Set the minimum number of bytes we need in the buffer for the next read.
- #supported_protocol?(version) ⇒ Boolean
-
#transition(state, next_length) ⇒ Object
def initialize.
- #window_size {|:window_size, @window_size| ... } ⇒ Object
Constructor Details
#initialize ⇒ Parser
Returns a new instance of Parser.
136 137 138 139 140 141 |
# File 'lib/lumberjack/beats/server.rb', line 136 def initialize @buffer_offset = 0 @buffer = "" @buffer.force_encoding("BINARY") transition(:header, 2) end |
Instance Method Details
#compressed_lead(&block) ⇒ Object
def data_field_value
286 287 288 289 |
# File 'lib/lumberjack/beats/server.rb', line 286 def compressed_lead(&block) length = get.unpack("N").first transition(:compressed_payload, length) end |
#compressed_payload(&block) ⇒ Object
291 292 293 294 295 296 297 298 |
# File 'lib/lumberjack/beats/server.rb', line 291 def compressed_payload(&block) original = Zlib::Inflate.inflate(get) transition(:header, 2) # Parse the uncompressed payload. parser = self.class.new parser.feed(original, &block) end |
#data_field_key(&block) ⇒ Object
261 262 263 264 |
# File 'lib/lumberjack/beats/server.rb', line 261 def data_field_key(&block) @key = get transition(:data_field_value_len, 4) end |
#data_field_key_len(&block) ⇒ Object
256 257 258 259 |
# File 'lib/lumberjack/beats/server.rb', line 256 def data_field_key_len(&block) key_len = get.unpack("N").first transition(:data_field_key, key_len) end |
#data_field_value(&block) ⇒ Object
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/lumberjack/beats/server.rb', line 270 def data_field_value(&block) @value = get @data_count -= 1 @data[@key] = @value if @data_count > 0 transition(:data_field_key_len, 4) else # emit the whole map now that we found the end of the data fields list. yield :data, @sequence, @data transition(:header, 2) end end |
#data_field_value_len(&block) ⇒ Object
266 267 268 |
# File 'lib/lumberjack/beats/server.rb', line 266 def data_field_value_len(&block) transition(:data_field_value, get.unpack("N").first) end |
#data_lead(&block) ⇒ Object
250 251 252 253 254 |
# File 'lib/lumberjack/beats/server.rb', line 250 def data_lead(&block) @sequence, @data_count = get.unpack("NN") @data = {} transition(:data_field_key_len, 4) end |
#feed(data, &block) ⇒ String?
Feed data to this parser.
Currently, it will return the raw payload of websocket messages. Otherwise, it returns nil if no complete message has yet been consumed.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/lumberjack/beats/server.rb', line 159 def feed(data, &block) @buffer << data #p :need => @need while have?(@need) send(@state, &block) #case @state #when :header; header(&block) #when :window_size; window_size(&block) #when :data_lead; data_lead(&block) #when :data_field_key_len; data_field_key_len(&block) #when :data_field_key; data_field_key(&block) #when :data_field_value_len; data_field_value_len(&block) #when :data_field_value; data_field_value(&block) #when :data_field_value; data_field_value(&block) #when :compressed_lead; compressed_lead(&block) #when :compressed_payload; compressed_payload(&block) #end # case @state end return nil end |
#get(length = nil) ⇒ Object
Get ‘length’ string from the buffer.
186 187 188 189 190 191 192 193 194 195 |
# File 'lib/lumberjack/beats/server.rb', line 186 def get(length=nil) length = @need if length.nil? data = @buffer[@buffer_offset ... @buffer_offset + length] @buffer_offset += length if @buffer_offset > 16384 @buffer = @buffer[@buffer_offset .. -1] @buffer_offset = 0 end return data end |
#handle_version(version, &block) ⇒ Object
221 222 223 224 225 226 227 |
# File 'lib/lumberjack/beats/server.rb', line 221 def handle_version(version, &block) if supported_protocol?(version) yield :version, version else raise "unsupported protocol #{version}" end end |
#have?(length) ⇒ Boolean
Do we have at least ‘length’ bytes in the buffer?
181 182 183 |
# File 'lib/lumberjack/beats/server.rb', line 181 def have?(length) return length <= (@buffer.size - @buffer_offset) end |
#header(&block) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/lumberjack/beats/server.rb', line 206 def header(&block) version, frame_type = get.bytes.to_a[0..1] version ||= PROTOCOL_VERSION_1 handle_version(version, &block) case frame_type when FRAME_WINDOW; transition(:window_size, 4) when FRAME_DATA; transition(:data_lead, 8) when FRAME_JSON_DATA; transition(:json_data_lead, 8) when FRAME_COMPRESSED; transition(:compressed_lead, 4) else; raise "Unknown frame type: `#{frame_type}`" end end |
#json_data_lead(&block) ⇒ Object
def window_size
239 240 241 242 |
# File 'lib/lumberjack/beats/server.rb', line 239 def json_data_lead(&block) @sequence, payload_size = get.unpack("NN") transition(:json_data_payload, payload_size) end |
#json_data_payload {|:json, @sequence, Lumberjack::Beats::json.load(payload)| ... } ⇒ Object
244 245 246 247 248 |
# File 'lib/lumberjack/beats/server.rb', line 244 def json_data_payload(&block) payload = get yield :json, @sequence, Lumberjack::Beats::json.load(payload) transition(:header, 2) end |
#need(length) ⇒ Object
Set the minimum number of bytes we need in the buffer for the next read.
198 199 200 |
# File 'lib/lumberjack/beats/server.rb', line 198 def need(length) @need = length end |
#supported_protocol?(version) ⇒ Boolean
229 230 231 |
# File 'lib/lumberjack/beats/server.rb', line 229 def supported_protocol?(version) SUPPORTED_PROTOCOLS.include?(version) end |
#transition(state, next_length) ⇒ Object
def initialize
143 144 145 146 147 148 149 150 |
# File 'lib/lumberjack/beats/server.rb', line 143 def transition(state, next_length) @state = state #puts :transition => state # TODO(sissel): Assert this self.respond_to?(state) # TODO(sissel): Assert state is in STATES # TODO(sissel): Assert next_length is a number need(next_length) end |
#window_size {|:window_size, @window_size| ... } ⇒ Object
233 234 235 236 237 |
# File 'lib/lumberjack/beats/server.rb', line 233 def window_size(&block) @window_size = get.unpack("N").first transition(:header, 2) yield :window_size, @window_size end |