Class: Fluent::Plugin::NatsStreamingOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::NatsStreamingOutput
- Defined in:
- lib/fluent/plugin/out_nats-streaming.rb
Constant Summary collapse
- DEFAULT_FORMAT_TYPE =
'json'
Instance Method Summary collapse
- #close ⇒ Object
- #configure(conf) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
-
#initialize ⇒ NatsStreamingOutput
constructor
A new instance of NatsStreamingOutput.
- #multi_workers_ready? ⇒ Boolean
- #process(tag, es) ⇒ Object
- #run ⇒ Object
- #setup_formatter(conf) ⇒ Object
- #start ⇒ Object
- #terminate ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ NatsStreamingOutput
Returns a new instance of NatsStreamingOutput.
53 54 55 56 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 53 def initialize super @sc = nil end |
Instance Method Details
#close ⇒ Object
141 142 143 144 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 141 def close super @sc.close if @sc end |
#configure(conf) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 58 def configure(conf) super @sc_config = { servers: ["nats://#{server}"], reconnect_time_wait: @reconnect_time_wait, max_reconnect_attempts: @max_reconnect_attempts, connect_timeout: @connect_timeout } formatter_conf = conf.elements('format').first unless formatter_conf raise Fluent::ConfigError, "<format> section is required." end unless formatter_conf["@type"] raise Fluent::ConfigError, "format/@type is required." end @formatter_proc = setup_formatter(formatter_conf) end |
#formatted_to_msgpack_binary? ⇒ Boolean
49 50 51 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 49 def formatted_to_msgpack_binary? true end |
#multi_workers_ready? ⇒ Boolean
45 46 47 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 45 def multi_workers_ready? true end |
#process(tag, es) ⇒ Object
118 119 120 121 122 123 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 118 def process(tag, es) es = inject_values_to_event_stream(tag, es) es.each do |time,record| @sc.publish(tag, format(tag, time, record)) end end |
#run ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 83 def run @sc = STAN::Client.new log.info "connect nats server nats://#{server} #{cluster_id} #{client_id}" @sc.connect(@cluster_id, @client_id.gsub(/\./, '_'), nats: @sc_config) log.info "connected" while thread_current_running? log.trace "test connection" @sc.nats.flush(@reconnect_time_wait) sleep(5) end end |
#setup_formatter(conf) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 97 def setup_formatter(conf) type = conf['@type'] case type when 'json' begin require 'oj' Oj. = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |tag, time, record| Oj.dump(record) } rescue LoadError require 'yajl' Proc.new { |tag, time, record| Yajl::Encoder.encode(record) } end when 'ltsv' require 'ltsv' Proc.new { |tag, time, record| LTSV.dump(record) } else @formatter = formatter_create(usage: 'kafka-plugin', conf: conf) @formatter.method(:format) end end |
#start ⇒ Object
78 79 80 81 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 78 def start super thread_create(:nats_streaming_output_main, &method(:run)) end |
#terminate ⇒ Object
146 147 148 149 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 146 def terminate super @sc = nil end |
#write(chunk) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/fluent/plugin/out_nats-streaming.rb', line 125 def write(chunk) return if chunk.empty? tag = chunk..tag = 0 chunk.each { |time, record| record_buf = @formatter_proc.call(tag, time, record) log.trace "Send record: #{record_buf}" @sc.publish(tag, record_buf, {timeout: @timeout} ) += 1 } if > 0 log.debug { "#{} messages send." } end end |