Class: Fluent::Kafka2Output

Inherits:
Output
  • Object
show all
Includes:
Fluent::KafkaPluginUtil::SSLSettings
Defined in:
lib/fluent/plugin/out_kafka2.rb

Instance Method Summary collapse

Methods included from Fluent::KafkaPluginUtil::SSLSettings

included, #read_ssl_file

Constructor Details

#initializeKafka2Output

Returns a new instance of Kafka2Output.



57
58
59
60
61
# File 'lib/fluent/plugin/out_kafka2.rb', line 57

def initialize
  super

  @kafka = nil
end

Instance Method Details

#closeObject



110
111
112
113
# File 'lib/fluent/plugin/out_kafka2.rb', line 110

def close
  super
  @kafka.close if @kafka
end

#configure(conf) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_kafka2.rb', line 78

def configure(conf)
  super

  if @brokers.size > 0
    log.info "brokers has been set: #{@brokers}"
  else
    raise Fluent::Config, 'No brokers specified. Need one broker at least.'
  end

  formatter_conf = conf.elements('format').first
  unless formatter_conf
    raise Fluent::ConfigError, "<format> section is required."
  end
  unless formatter_conf["@type"]
    raise Fluent::ConfigError, "format/@type is required."
  end
  @formatter_proc = setup_formatter(formatter_conf)

  @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}
  @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
  @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/fluent/plugin/out_kafka2.rb', line 101

def multi_workers_ready?
  true
end

#refresh_client(raise_error = true) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_kafka2.rb', line 63

def refresh_client(raise_error = true)
  begin
    logger = @get_kafka_client_log ? log : nil
    @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
                       ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key))
    log.info "initialized kafka producer: #{@client_id}"
  rescue Exception => e
    if raise_error # During startup, error should be reported to engine and stop its phase for safety.
      raise e
    else
      log.error e
    end
  end
end

#setup_formatter(conf) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/out_kafka2.rb', line 120

def setup_formatter(conf)
  type = conf['@type']
  case type
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |tag, time, record| Oj.dump(record) }
    rescue LoadError
      require 'yajl'
      Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  else
    @formatter = formatter_create(usage: 'kafka-plugin', conf: conf)
    @formatter.method(:format)
  end
end

#startObject



105
106
107
108
# File 'lib/fluent/plugin/out_kafka2.rb', line 105

def start
  super
  refresh_client
end

#terminateObject



115
116
117
118
# File 'lib/fluent/plugin/out_kafka2.rb', line 115

def terminate
  super
  @kafka = nil
end

#write(chunk) ⇒ Object

TODO: optimize write performance



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/fluent/plugin/out_kafka2.rb', line 142

def write(chunk)
  tag = chunk..tag
  topic = chunk..variables[:topic] || @default_topic || tag
  producer = @kafka.topic_producer(topic, @producer_opts)

  messages = 0
  record_buf = nil

  begin
    chunk.msgpack_each { |time, record|
      begin
        record = inject_values_to_record(tag, time, record)
        record.delete('topic'.freeze) if @exclude_topic_key
        partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key
        partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition
        message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key

        record_buf = @formatter_proc.call(tag, time, record)
      rescue StandardError => e
        log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record
        next
      end

      log.on_trace { "message will send to #{topic} with partition_key: #{partition_key}, partition: #{partition}, message_key: #{message_key} and value: #{record_buf}." }
      messages += 1

      producer.produce(record_buf, message_key, partition, partition_key)
    }

    if messages > 0
      log.trace { "#{messages} messages send." }
      producer.deliver_messages
    end
  end
rescue Exception => e
  log.warn "Send exception occurred: #{e}"
  log.warn "Exception Backtrace : #{e.backtrace.join("\n")}"
  # For safety, refresh client and its producers
  refresh_client(false)
  # Raise exception to retry sendind messages
  raise e
ensure
  producer.shutdown if producer
end