Class: Fluent::Plugin::AMQPOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_amqp.rb

Overview

AMQPOutput to be used as a Fluent MATCHER, sending messages to a RabbitMQ messaging broker

Defined Under Namespace

Classes: HeaderElement

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



20
21
22
# File 'lib/fluent/plugin/out_amqp.rb', line 20

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



16
17
18
# File 'lib/fluent/plugin/out_amqp.rb', line 16

def connection
  @connection
end

#exchObject (readonly)

Attribute readers to support testing



19
20
21
# File 'lib/fluent/plugin/out_amqp.rb', line 19

def exch
  @exch
end

Instance Method Details

#check_tls_configurationObject



239
240
241
242
243
244
245
# File 'lib/fluent/plugin/out_amqp.rb', line 239

def check_tls_configuration()
  if @tls
    unless @tls_key && @tls_cert
      raise Fluent::ConfigError, "'tls_key' and 'tls_cert' must be all specified if tls is enabled."
    end
  end
end

#configure(conf) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/out_amqp.rb', line 90

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  @conf = conf

  # Extract the header configuration into a collection
  @headers = conf.elements.select {|e|
    e.name == 'header'
  }.map {|e|
    he = HeaderElement.new
    he.configure(e)
    unless he.source || he.default
      raise Fluent::ConfigError, "At least 'default' or 'source' must must be defined in a header configuration section."
    end
    he
  }

  unless @host || @hosts
    raise Fluent::ConfigError, "'host' or 'hosts' must be specified."
  end
  unless @key || @tag_key
    raise Fluent::ConfigError, "Either 'key' or 'tag_key' must be set."
  end
  check_tls_configuration
end

#format(tag, time, record) ⇒ Object



152
153
154
# File 'lib/fluent/plugin/out_amqp.rb', line 152

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



148
149
150
# File 'lib/fluent/plugin/out_amqp.rb', line 148

def formatted_to_msgpack_binary
  true
end

#get_connection_optionsObject



247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/fluent/plugin/out_amqp.rb', line 247

def get_connection_options()
  hosts = @hosts ||= Array.new(1, @host)
  opts = {
    hosts: hosts, port: @port, vhost: @vhost,
    pass: @pass, user: @user, ssl: @ssl,
    verify_ssl: @verify_ssl, heartbeat: @heartbeat,
    tls: @tls || nil,
    tls_cert: @tls_cert,
    tls_key: @tls_key,
    verify_peer: @tls_verify_peer
  }
  opts[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  return opts
end

#headers(tag, time, data) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/fluent/plugin/out_amqp.rb', line 220

def headers( tag, time, data )
  h = {}

  log.debug "Processing Headers: #{@headers}"
  # A little messy this...
  # Trying to allow for header overrides where a header defined
  # earlier will be used if a later header is returning nil (ie not found and no default)
  h = Hash[ @headers
              .collect{|v| [v.name, v.getValue(data) ]}
              .delete_if{|x| x.last.nil?}
      ]

  h[@tag_header] = tag if @tag_header
  h[@time_header] = Time.at(time).utc.to_s if @time_header

  h
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/fluent/plugin/out_amqp.rb', line 144

def multi_workers_ready?
  true
end

#routing_key(tag) ⇒ Object



212
213
214
215
216
217
218
# File 'lib/fluent/plugin/out_amqp.rb', line 212

def routing_key( tag )
  if @tag_key
    tag
  else
    @key
  end
end

#shutdownObject



139
140
141
142
# File 'lib/fluent/plugin/out_amqp.rb', line 139

def shutdown
  @connection.stop
  super
end

#startObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/out_amqp.rb', line 116

def start
  super
  begin
    log.info "Connecting to RabbitMQ..."
    @connection = Bunny.new(get_connection_options) unless @connection
    @connection.start
  rescue Bunny::TCPConnectionFailed => e
    log.error "Connection to #{@host} failed"
  rescue Bunny::PossibleAuthenticationFailureError => e
    log.error "Could not authenticate as #{@user}"
  end

  log.info 'Creating new channel'
  @channel = @connection.create_channel

  return if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN

  log.info 'Creating new exchange (in start)', exchange: @exchange
  @exch = @channel.exchange(@exchange, type: @exchange_type.intern,
                          passive: @passive, durable: @durable,
                          auto_delete: @auto_delete)
end

#write(chunk) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/fluent/plugin/out_amqp.rb', line 156

def write(chunk)
  begin
    log.debug 'in write', chunk_id: dump_unique_id_hex(chunk.unique_id)
    log.debug 'raw exchange value is', exchange: @exchange.to_s

    if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN
      exchange_name = extract_placeholders(@exchange, chunk)
      log.info 'resolved exchange value is', exchange_name: exchange_name
      @exch = @channel.exchange(exchange_name, type: @exchange_type.intern,
                                passive: @passive, durable: @durable,
                                auto_delete: @auto_delete)
    end

    log.debug 'writing data to exchange', chunk_id: dump_unique_id_hex(chunk.unique_id)

    chunk.msgpack_each do |(tag, time, data)|
      begin
        msg_headers = headers(tag,time,data)

        begin
          data = JSON.dump( data ) unless data.is_a?( String )
        rescue JSON::GeneratorError => e
          log.warn "Failure converting data object to json string: #{e.message} - sending as raw object"
          # Debug only - otherwise we may pollute the fluent logs with unparseable events and loop
          log.debug "JSON.dump failure converting [#{data}]"
        end

        log.info "Sending message #{data}, :key => #{routing_key( tag)} :headers => #{headers(tag,time,data)}"
        @exch.publish(
          data,
          key: routing_key( tag ),
          persistent: @persistent,
          headers: msg_headers,
          content_type: @content_type,
          content_encoding: @content_encoding)

  # :nocov:
  #  Hard to throw StandardError through test code
      rescue StandardError => e
        # This protects against invalid byteranges and other errors at a per-message level
        log.error "Unexpected error during message publishing: #{e.message}"
        log.debug "Failure in publishing message [#{data}]"
      end
    end
  rescue MessagePack::MalformedFormatError => e
    # This has been observed when a server has filled the partition containing
    # the buffer files, and during replay the chunks were malformed
    log.error "Malformed msgpack in chunk - Did your server run out of space during buffering? #{e.message}"
  rescue StandardError => e
    # Just in case theres any other errors during chunk loading.
    log.error "Unexpected error during message publishing: #{e.message}"
  end
  # :nocov:
end