Class: LogCourier::ClientZmq
Instance Attribute Summary
Attributes inherited from EventQueue
#max
Instance Method Summary
collapse
Methods inherited from EventQueue
#clear, #empty?, #length, #num_waiting, #pop, #push
Constructor Details
#initialize(factory, source, source_str, &try_drop) ⇒ ClientZmq
Returns a new instance of ClientZmq.
331
332
333
334
335
336
337
338
339
340
341
|
# File 'lib/log-courier/server_zmq.rb', line 331
def initialize(factory, source, source_str, &try_drop)
@factory = factory
@logger = @factory.options[:logger]
@send_queue = @factory.send_queue
@source = source
@source_str = source_str
@try_drop = try_drop
super @factory.options[:peer_recv_queue]
end
|
Instance Method Details
#add_fields(event) ⇒ Object
372
373
|
# File 'lib/log-courier/server_zmq.rb', line 372
def add_fields(event)
end
|
#run(&block) ⇒ Object
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
|
# File 'lib/log-courier/server_zmq.rb', line 343
def run(&block)
loop do
begin
data = self.pop(30)
recv(data, &block)
rescue TimeoutError
retry if !@try_drop.call(@source)
break
end
end
return
rescue ShutdownSignal
@logger.info 'Source shutting down', :source => @source_str unless @logger.nil?
return
rescue StandardError, NativeException => e
@logger.warn e, :hint => 'Unknown error, connection aborted', :source => @source_str unless @logger.nil?
raise e
end
|
#send(signature, message) ⇒ Object
366
367
368
369
370
|
# File 'lib/log-courier/server_zmq.rb', line 366
def send(signature, message)
data = signature + [message.length].pack('N') + message
@send_queue.push @source + ['', data]
return
end
|