Class: RelpClient
Overview
This is only used by the tests; any problems here are not as important as elsewhere
Constant Summary
Constants inherited from Relp
Relp::RelpSoftware, Relp::RelpVersion
Instance Method Summary collapse
-
#close ⇒ Object
TODO: have a way to get back unacked messages on close.
-
#initialize(host, port, required_commands = [], buffer_size = 128, retransmission_timeout = 10) ⇒ RelpClient
constructor
A new instance of RelpClient.
- #nexttxnr ⇒ Object
- #syslog_write(logline) ⇒ Object
Methods inherited from Relp
#frame_read, #frame_write, #server?, #valid_command?
Constructor Details
#initialize(host, port, required_commands = [], buffer_size = 128, retransmission_timeout = 10) ⇒ RelpClient
Returns a new instance of RelpClient.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/logstash/util/relp.rb', line 213 def initialize(host,port,required_commands = [],buffer_size = 128, retransmission_timeout=10) @logger = Cabin::Channel.get(LogStash) @logger.info? and @logger.info("Starting RELP client", :host => host, :port => port) @server = false @buffer = Hash.new @buffer_size = buffer_size @retransmission_timeout = retransmission_timeout #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) @basic_relp_commands = ['serverclose','rsp']#TODO: check for others #These are extra commands that we require, otherwise refuse the connection @required_relp_commands = required_commands @socket=TCPSocket.new(host,port) #This'll start the automatic frame numbering @lasttxnr = 0 offer=Hash.new offer['command'] = 'open' offer['message'] = 'relp_version=' + RelpVersion + "\n" offer['message'] += 'relp_software=' + RelpSoftware + "\n" offer['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: add optional ones self.frame_write(@socket, offer) response_frame = self.frame_read(@socket) if response_frame['message'][0,3] != '200' raise RelpError,response_frame['message'] end response=Hash[*response_frame['message'][7..-1].scan(/^(.*)=(.*)$/).flatten] if response['relp_version'].nil? #if no version specified, relp spec says we must close connection self.close() raise RelpError, 'No relp_version specified; offer: ' + response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer elsif ! (@required_relp_commands - response['commands'].split(',')).empty? #if it can't receive syslog it's useless to us; close the connection self.close() raise InsufficientCommands, response['commands'] + ' offered, require ' + @required_relp_commands.join(',') end #If we've got this far with no problems, we're good to go @logger.info? and @logger.info("Connection establish with server") #This thread deals with responses that come back reader = Thread.start do loop do f = self.frame_read(@socket) if f['command'] == 'rsp' && f['message'] == '200 OK' @buffer.delete(f['txnr']) elsif f['command'] == 'rsp' && f['message'][0,1] == '5' #TODO: What if we get an error for something we're already retransmitted due to timeout? new_txnr = self.frame_write(@socket, @buffer[f['txnr']]) @buffer[new_txnr] = @buffer[f['txnr']] @buffer.delete(f['txnr']) elsif f['command'] == 'serverclose' || f['txnr'] == @close_txnr break else #Don't know what's going on if we get here, but it can't be good raise RelpError#TODO: raising errors like this makes no sense end end end #While this one deals with frames for which we get no reply Thread.start do old_buffer = Hash.new loop do #This returns old txnrs that are still present (@buffer.keys & old_buffer.keys).each do |txnr| new_txnr = self.frame_write(@socket, @buffer[txnr]) @buffer[new_txnr] = @buffer[txnr] @buffer.delete(txnr) end old_buffer = @buffer sleep @retransmission_timeout end end end |
Instance Method Details
#close ⇒ Object
TODO: have a way to get back unacked messages on close
299 300 301 302 303 304 305 306 307 |
# File 'lib/logstash/util/relp.rb', line 299 def close frame = Hash.new frame['command'] = 'close' @close_txnr=self.frame_write(@socket, frame) #TODO: ought to properly wait for a reply etc. The serverclose will make it work though sleep @retransmission_timeout @socket.close#TODO: shutdown? return @buffer end |
#nexttxnr ⇒ Object
322 323 324 |
# File 'lib/logstash/util/relp.rb', line 322 def nexttxnr @lasttxnr += 1 end |
#syslog_write(logline) ⇒ Object
309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/logstash/util/relp.rb', line 309 def syslog_write(logline) #If the buffer is already full, wait until a gap opens up sleep 0.1 until @buffer.length<@buffer_size frame = Hash.new frame['command'] = 'syslog' frame['message'] = logline txnr = self.frame_write(@socket, frame) @buffer[txnr] = frame end |