Class: Fluent::Plugin::NatsStreamingOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_nats-streaming.rb

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
'json'

Instance Method Summary collapse

Constructor Details

#initializeNatsStreamingOutput

Returns a new instance of NatsStreamingOutput.



53
54
55
56
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 53

def initialize
  super
  @sc = nil
end

Instance Method Details

#closeObject



141
142
143
144
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 141

def close
  super
  @sc.close if @sc
end

#configure(conf) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 58

def configure(conf)
  super

  @sc_config = {
    servers: ["nats://#{server}"],
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts,
    connect_timeout: @connect_timeout
  }

  formatter_conf = conf.elements('format').first
  unless formatter_conf
    raise Fluent::ConfigError, "<format> section is required."
  end
  unless formatter_conf["@type"]
    raise Fluent::ConfigError, "format/@type is required."
  end
  @formatter_proc = setup_formatter(formatter_conf)
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 49

def formatted_to_msgpack_binary?
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 45

def multi_workers_ready?
  true
end

#process(tag, es) ⇒ Object



118
119
120
121
122
123
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 118

def process(tag, es)
  es = inject_values_to_event_stream(tag, es)
  es.each do |time,record|
    @sc.publish(tag, format(tag, time, record))
  end
end

#runObject



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 83

def run
  @sc = STAN::Client.new

  log.info "connect nats server nats://#{server} #{cluster_id} #{client_id}"
  @sc.connect(@cluster_id, @client_id.gsub(/\./, '_'), nats: @sc_config)
  log.info "connected"

  while thread_current_running?
    log.trace "test connection"
    @sc.nats.flush(@reconnect_time_wait)
    sleep(5)
  end
end

#setup_formatter(conf) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 97

def setup_formatter(conf)
  type = conf['@type']
  case type
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |tag, time, record| Oj.dump(record) }
    rescue LoadError
      require 'yajl'
      Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  else
    @formatter = formatter_create(usage: 'kafka-plugin', conf: conf)
    @formatter.method(:format)
  end
end

#startObject



78
79
80
81
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 78

def start
  super
  thread_create(:nats_streaming_output_main, &method(:run))
end

#terminateObject



146
147
148
149
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 146

def terminate
  super
  @sc = nil
end

#write(chunk) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 125

def write(chunk)
  return if chunk.empty?
  tag = chunk..tag

  messages = 0
  chunk.each { |time, record|
    record_buf = @formatter_proc.call(tag, time, record)
    log.trace "Send record: #{record_buf}"
    @sc.publish(tag, record_buf, {timeout: @timeout} )
    messages += 1
  }
  if messages > 0
      log.debug { "#{messages} messages send." }
  end
end