24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/logstash/inputs/nsq.rb', line 24
def run(logstash_queue)
@logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
begin
begin
if @tls_key and @tls_cert
consumer = Nsq::Consumer.new(
:nsqlookupd => @nsqlookupd,
:topic => @topic,
:channel => @channel,
:max_in_flight => @max_in_flight,
:tls_v1 => @tls_v1,
:tls_context => {
key: @tls_key,
certificate: @tls_cert
}
)
else
consumer = Nsq::Consumer.new(
:nsqlookupd => @nsqlookupd,
:topic => @topic,
:channel => @channel,
:tls_v1 => @tls_v1,
:max_in_flight => @max_in_flight
)
end
while true
event = consumer.pop
queue_event(event.body, logstash_queue)
event.finish
end
rescue LogStash::ShutdownSignal
@logger.info('nsq got shutdown signal')
end
@logger.info('Done running nsq input')
rescue => e
@logger.warn('client threw exception, restarting',
:exception => e)
retry
end
finished
end
|