Class: Fluent::Plugin::NSQOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::NSQOutput
- Defined in:
- lib/fluent/plugin/out_nsq.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'lib/fluent/plugin/out_nsq.rb', line 23 def configure(conf) super fail Fluent::ConfigError, 'Missing nsqlookupd or nsqd' unless @nsqlookupd || @nsqd fail Fluent::ConfigError, 'Missing topic' unless @topic if @enable_tls fail Fluent::ConfigError, 'Missing tls config params' unless @key && @certificate && @ca_certificate end end |
#shutdown ⇒ Object
56 57 58 59 |
# File 'lib/fluent/plugin/out_nsq.rb', line 56 def shutdown @producer.terminate super end |
#start ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/out_nsq.rb', line 33 def start super producer_config = {topic: @topic} if @nsqlookupd producer_config[:nsqlookupd] = @nsqlookupd elsif @nsqd producer_config[:nsqd] = @nsqd end if @enable_tls producer_config[:tls_v1] = true producer_config[:tls_options] = { key: @key, certificate: @certificate, ca_certificate: @ca_certificate, verify_mode: OpenSSL::SSL::VERIFY_PEER } end @producer = Nsq::Producer.new(producer_config) end |
#write(chunk) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/plugin/out_nsq.rb', line 61 def write(chunk) return if chunk.empty? tag = chunk..tag chunk.each do |time, record| tagged_record = record.merge( :_key => tag, :_ts => time.to_f, :'@timestamp' => Time.at(time).iso8601(3) # kibana/elasticsearch friendly ) begin @producer.write(Yajl.dump(tagged_record)) rescue => e log.warn("nsq: #{e}") end end end |