Class: Fluent::Kafka2Output
- Inherits:
-
Output
- Object
- Output
- Fluent::Kafka2Output
- Includes:
- Fluent::KafkaPluginUtil::SSLSettings
- Defined in:
- lib/fluent/plugin/out_kafka2.rb
Instance Method Summary collapse
- #close ⇒ Object
- #configure(conf) ⇒ Object
-
#initialize ⇒ Kafka2Output
constructor
A new instance of Kafka2Output.
- #multi_workers_ready? ⇒ Boolean
- #refresh_client(raise_error = true) ⇒ Object
- #setup_formatter(conf) ⇒ Object
- #start ⇒ Object
- #terminate ⇒ Object
-
#write(chunk) ⇒ Object
TODO: optimize write performance.
Methods included from Fluent::KafkaPluginUtil::SSLSettings
Constructor Details
#initialize ⇒ Kafka2Output
Returns a new instance of Kafka2Output.
57 58 59 60 61 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 57 def initialize super @kafka = nil end |
Instance Method Details
#close ⇒ Object
110 111 112 113 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 110 def close super @kafka.close if @kafka end |
#configure(conf) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 78 def configure(conf) super if @brokers.size > 0 log.info "brokers has been set: #{@brokers}" else raise Fluent::Config, 'No brokers specified. Need one broker at least.' end formatter_conf = conf.elements('format').first unless formatter_conf raise Fluent::ConfigError, "<format> section is required." end unless formatter_conf["@type"] raise Fluent::ConfigError, "format/@type is required." end @formatter_proc = setup_formatter(formatter_conf) @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks} @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec end |
#multi_workers_ready? ⇒ Boolean
101 102 103 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 101 def multi_workers_ready? true end |
#refresh_client(raise_error = true) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 63 def refresh_client(raise_error = true) begin logger = @get_kafka_client_log ? log : nil @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key)) log.info "initialized kafka producer: #{@client_id}" rescue Exception => e if raise_error # During startup, error should be reported to engine and stop its phase for safety. raise e else log.error e end end end |
#setup_formatter(conf) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 120 def setup_formatter(conf) type = conf['@type'] case type when 'json' begin require 'oj' Oj. = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |tag, time, record| Oj.dump(record) } rescue LoadError require 'yajl' Proc.new { |tag, time, record| Yajl::Encoder.encode(record) } end when 'ltsv' require 'ltsv' Proc.new { |tag, time, record| LTSV.dump(record) } else @formatter = formatter_create(usage: 'kafka-plugin', conf: conf) @formatter.method(:format) end end |
#start ⇒ Object
105 106 107 108 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 105 def start super refresh_client end |
#terminate ⇒ Object
115 116 117 118 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 115 def terminate super @kafka = nil end |
#write(chunk) ⇒ Object
TODO: optimize write performance
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/fluent/plugin/out_kafka2.rb', line 142 def write(chunk) tag = chunk..tag topic = chunk..variables[:topic] || @default_topic || tag producer = @kafka.topic_producer(topic, @producer_opts) = 0 record_buf = nil begin chunk.msgpack_each { |time, record| begin record = inject_values_to_record(tag, time, record) record.delete('topic'.freeze) if @exclude_topic_key partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key record_buf = @formatter_proc.call(tag, time, record) rescue StandardError => e log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record next end log.on_trace { "message will send to #{topic} with partition_key: #{partition_key}, partition: #{partition}, message_key: #{} and value: #{record_buf}." } += 1 producer.produce(record_buf, , partition, partition_key) } if > 0 log.trace { "#{} messages send." } producer. end end rescue Exception => e log.warn "Send exception occurred: #{e}" log.warn "Exception Backtrace : #{e.backtrace.join("\n")}" # For safety, refresh client and its producers refresh_client(false) # Raise exception to retry sendind messages raise e ensure producer.shutdown if producer end |