Class: Fluent::Kafka2Output

Inherits:
Output
  • Object
show all
Includes:
Fluent::KafkaPluginUtil::SSLSettings, Fluent::KafkaPluginUtil::SaslSettings
Defined in:
lib/fluent/plugin/out_kafka2.rb

Instance Method Summary collapse

Methods included from Fluent::KafkaPluginUtil::SaslSettings

included

Methods included from Fluent::KafkaPluginUtil::SSLSettings

included, #read_ssl_file

Constructor Details

#initializeKafka2Output

Returns a new instance of Kafka2Output.



64
65
66
67
68
# File 'lib/fluent/plugin/out_kafka2.rb', line 64

def initialize
  super

  @kafka = nil
end

Instance Method Details

#closeObject



147
148
149
150
# File 'lib/fluent/plugin/out_kafka2.rb', line 147

def close
  super
  @kafka.close if @kafka
end

#configure(conf) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/out_kafka2.rb', line 96

def configure(conf)
  super

  if @brokers.size > 0
    log.info "brokers has been set: #{@brokers}"
  else
    raise Fluent::Config, 'No brokers specified. Need one broker at least.'
  end

  formatter_conf = conf.elements('format').first
  unless formatter_conf
    raise Fluent::ConfigError, "<format> section is required."
  end
  unless formatter_conf["@type"]
    raise Fluent::ConfigError, "format/@type is required."
  end
  @formatter_proc = setup_formatter(formatter_conf)

  if @default_topic.nil?
    if @chunk_keys.include?('topic') && !@chunk_keys.include?('tag')
      log.warn "Use 'topic' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer topic,tag>"
    end
  else
    if @chunk_keys.include?('tag')
      log.warn "default_topic is set. Fluentd's event tag is not used for topic"
    end
  end

  @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}
  @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
  @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec
  if @active_support_notification_regex
    require 'active_support/notifications'
    require 'active_support/core_ext/hash/keys'
    ActiveSupport::Notifications.subscribe(Regexp.new(@active_support_notification_regex)) do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)
      message = event.payload.respond_to?(:stringify_keys) ? event.payload.stringify_keys : event.payload
      @router.emit("fluent_kafka_stats.#{event.name}", Time.now.to_i, message)
    end
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


138
139
140
# File 'lib/fluent/plugin/out_kafka2.rb', line 138

def multi_workers_ready?
  true
end

#refresh_client(raise_error = true) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_kafka2.rb', line 70

def refresh_client(raise_error = true)
  begin
    logger = @get_kafka_client_log ? log : nil
    if @scram_mechanism != nil && @username != nil && @password != nil
      @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
                         ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                         sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism)
    elsif @username != nil && @password != nil
      @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
                         ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                         sasl_plain_username: @username, sasl_plain_password: @password)
    else
      @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
                         ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                         sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
    end
    log.info "initialized kafka producer: #{@client_id}"
  rescue Exception => e
    if raise_error # During startup, error should be reported to engine and stop its phase for safety.
      raise e
    else
      log.error e
    end
  end
end

#setup_formatter(conf) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/fluent/plugin/out_kafka2.rb', line 157

def setup_formatter(conf)
  type = conf['@type']
  case type
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |tag, time, record| Oj.dump(record) }
    rescue LoadError
      require 'yajl'
      Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  else
    @formatter = formatter_create(usage: 'kafka-plugin', conf: conf)
    @formatter.method(:format)
  end
end

#startObject



142
143
144
145
# File 'lib/fluent/plugin/out_kafka2.rb', line 142

def start
  super
  refresh_client
end

#terminateObject



152
153
154
155
# File 'lib/fluent/plugin/out_kafka2.rb', line 152

def terminate
  super
  @kafka = nil
end

#write(chunk) ⇒ Object

TODO: optimize write performance



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/fluent/plugin/out_kafka2.rb', line 179

def write(chunk)
  tag = chunk..tag
  topic = chunk..variables[:topic] || @default_topic || tag
  producer = @kafka.topic_producer(topic, @producer_opts)

  messages = 0
  record_buf = nil

  begin
    chunk.msgpack_each { |time, record|
      begin
        record = inject_values_to_record(tag, time, record)
        record.delete('topic'.freeze) if @exclude_topic_key
        partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key
        partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition
        message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key

        record_buf = @formatter_proc.call(tag, time, record)
      rescue StandardError => e
        log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record
        next
      end

      log.trace { "message will send to #{topic} with partition_key: #{partition_key}, partition: #{partition}, message_key: #{message_key} and value: #{record_buf}." }
      messages += 1

      producer.produce(record_buf, message_key, partition, partition_key)
    }

    if messages > 0
      log.debug { "#{messages} messages send." }
      producer.deliver_messages
    end
  end
rescue Exception => e
  log.warn "Send exception occurred: #{e}"
  log.warn "Exception Backtrace : #{e.backtrace.join("\n")}"
  # For safety, refresh client and its producers
  refresh_client(false)
  # Raise exception to retry sendind messages
  raise e
ensure
  producer.shutdown if producer
end