Class: RelpClient

Inherits:
Relp
  • Object
show all
Defined in:
lib/logstash/util/relp.rb

Overview

This is only used by the tests; any problems here are not as important as elsewhere

Constant Summary

Constants inherited from Relp

Relp::RelpSoftware, Relp::RelpVersion

Instance Method Summary collapse

Methods inherited from Relp

#frame_read, #frame_write, #server?, #valid_command?

Constructor Details

#initialize(host, port, required_commands = [], buffer_size = 128, retransmission_timeout = 10) ⇒ RelpClient

Returns a new instance of RelpClient.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/logstash/util/relp.rb', line 213

def initialize(host,port,required_commands = [],buffer_size = 128,
               retransmission_timeout=10)
  @logger = Cabin::Channel.get(LogStash)
  @logger.info? and @logger.info("Starting RELP client", :host => host, :port => port)
  @server = false
  @buffer = Hash.new

  @buffer_size = buffer_size
  @retransmission_timeout = retransmission_timeout

  #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.)
  @basic_relp_commands = ['serverclose','rsp']#TODO: check for others

  #These are extra commands that we require, otherwise refuse the connection
  @required_relp_commands = required_commands

  @socket=TCPSocket.new(host,port)

  #This'll start the automatic frame numbering 
  @lasttxnr = 0

  offer=Hash.new
  offer['command'] = 'open'
  offer['message'] = 'relp_version=' + RelpVersion + "\n"
  offer['message'] += 'relp_software=' + RelpSoftware + "\n"
  offer['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: add optional ones
  self.frame_write(@socket, offer)
  response_frame = self.frame_read(@socket)
  if response_frame['message'][0,3] != '200'
    raise RelpError,response_frame['message']
  end

  response=Hash[*response_frame['message'][7..-1].scan(/^(.*)=(.*)$/).flatten]
  if response['relp_version'].nil?
    #if no version specified, relp spec says we must close connection
    self.close()
    raise RelpError, 'No relp_version specified; offer: '
        + response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten

  #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
  elsif ! (@required_relp_commands - response['commands'].split(',')).empty?
    #if it can't receive syslog it's useless to us; close the connection 
    self.close()
    raise InsufficientCommands, response['commands'] + ' offered, require '
        + @required_relp_commands.join(',')
  end
  #If we've got this far with no problems, we're good to go
  @logger.info? and @logger.info("Connection establish with server")

  #This thread deals with responses that come back
  reader = Thread.start do
    loop do
      f = self.frame_read(@socket)
      if f['command'] == 'rsp' && f['message'] == '200 OK'
        @buffer.delete(f['txnr'])
      elsif f['command'] == 'rsp' && f['message'][0,1] == '5'
        #TODO: What if we get an error for something we're already retransmitted due to timeout?
        new_txnr = self.frame_write(@socket, @buffer[f['txnr']])
        @buffer[new_txnr] = @buffer[f['txnr']]
        @buffer.delete(f['txnr'])
      elsif f['command'] == 'serverclose' || f['txnr'] == @close_txnr
        break
      else
        #Don't know what's going on if we get here, but it can't be good
        raise RelpError#TODO: raising errors like this makes no sense
      end
    end
  end

  #While this one deals with frames for which we get no reply
  Thread.start do
    old_buffer = Hash.new
    loop do
      #This returns old txnrs that are still present
      (@buffer.keys & old_buffer.keys).each do |txnr|
        new_txnr = self.frame_write(@socket, @buffer[txnr])
        @buffer[new_txnr] = @buffer[txnr]
        @buffer.delete(txnr)
      end
      old_buffer = @buffer
      sleep @retransmission_timeout
    end
  end
end

Instance Method Details

#closeObject

TODO: have a way to get back unacked messages on close



299
300
301
302
303
304
305
306
307
# File 'lib/logstash/util/relp.rb', line 299

def close
  frame = Hash.new
  frame['command'] = 'close'
  @close_txnr=self.frame_write(@socket, frame)
  #TODO: ought to properly wait for a reply etc. The serverclose will make it work though
  sleep @retransmission_timeout
  @socket.close#TODO: shutdown?
  return @buffer
end

#nexttxnrObject



322
323
324
# File 'lib/logstash/util/relp.rb', line 322

def nexttxnr
  @lasttxnr += 1
end

#syslog_write(logline) ⇒ Object



309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/logstash/util/relp.rb', line 309

def syslog_write(logline)

  #If the buffer is already full, wait until a gap opens up
  sleep 0.1 until @buffer.length<@buffer_size

  frame = Hash.new
  frame['command'] = 'syslog'
  frame['message'] = logline

  txnr = self.frame_write(@socket, frame)
  @buffer[txnr] = frame
end