Class: Fluent::Plugin::NSQOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_nsq.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/out_nsq.rb', line 23

def configure(conf)
  super

  fail Fluent::ConfigError, 'Missing nsqlookupd or nsqd' unless @nsqlookupd || @nsqd
  fail Fluent::ConfigError, 'Missing topic' unless @topic
  if @enable_tls
    fail Fluent::ConfigError, 'Missing tls config params' unless @key && @certificate && @ca_certificate
  end
end

#shutdownObject



56
57
58
59
# File 'lib/fluent/plugin/out_nsq.rb', line 56

def shutdown
  @producer.terminate
  super
end

#startObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_nsq.rb', line 33

def start
  super
  producer_config = {topic: @topic}

  if  @nsqlookupd
    producer_config[:nsqlookupd] = @nsqlookupd
  elsif @nsqd
    producer_config[:nsqd] = @nsqd
  end

  if @enable_tls
    producer_config[:tls_v1] = true
    producer_config[:tls_options] = {
          key: @key,
          certificate: @certificate,
          ca_certificate: @ca_certificate,
          verify_mode: OpenSSL::SSL::VERIFY_PEER
      }
  end
  
  @producer = Nsq::Producer.new(producer_config)
end

#write(chunk) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/out_nsq.rb', line 61

def write(chunk)
  return if chunk.empty?

  tag = chunk..tag
  chunk.each do |time, record|
    tagged_record = record.merge(
      :_key => tag,
      :_ts => time.to_f,
      :'@timestamp' => Time.at(time).iso8601(3) # kibana/elasticsearch friendly
    )
    begin
      @producer.write(Yajl.dump(tagged_record))
    rescue => e
      log.warn("nsq: #{e}")
    end
  end
end