Class: Fluent::Plugin::AMQPOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::AMQPOutput
- Defined in:
- lib/fluent/plugin/out_amqp.rb
Overview
AMQPOutput to be used as a Fluent MATCHER, sending messages to a RabbitMQ messaging broker
Defined Under Namespace
Classes: HeaderElement
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exch ⇒ Object
readonly
Attribute readers to support testing.
Instance Method Summary collapse
- #check_tls_configuration ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
- #get_connection_options ⇒ Object
- #headers(tag, time, data) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #routing_key(tag) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
20 21 22 |
# File 'lib/fluent/plugin/out_amqp.rb', line 20 def channel @channel end |
#connection ⇒ Object
Returns the value of attribute connection.
16 17 18 |
# File 'lib/fluent/plugin/out_amqp.rb', line 16 def connection @connection end |
#exch ⇒ Object (readonly)
Attribute readers to support testing
19 20 21 |
# File 'lib/fluent/plugin/out_amqp.rb', line 19 def exch @exch end |
Instance Method Details
#check_tls_configuration ⇒ Object
239 240 241 242 243 244 245 |
# File 'lib/fluent/plugin/out_amqp.rb', line 239 def check_tls_configuration() if @tls unless @tls_key && @tls_cert raise Fluent::ConfigError, "'tls_key' and 'tls_cert' must be all specified if tls is enabled." end end end |
#configure(conf) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/out_amqp.rb', line 90 def configure(conf) compat_parameters_convert(conf, :buffer) super @conf = conf # Extract the header configuration into a collection @headers = conf.elements.select {|e| e.name == 'header' }.map {|e| he = HeaderElement.new he.configure(e) unless he.source || he.default raise Fluent::ConfigError, "At least 'default' or 'source' must must be defined in a header configuration section." end he } unless @host || @hosts raise Fluent::ConfigError, "'host' or 'hosts' must be specified." end unless @key || @tag_key raise Fluent::ConfigError, "Either 'key' or 'tag_key' must be set." end check_tls_configuration end |
#format(tag, time, record) ⇒ Object
152 153 154 |
# File 'lib/fluent/plugin/out_amqp.rb', line 152 def format(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary ⇒ Object
148 149 150 |
# File 'lib/fluent/plugin/out_amqp.rb', line 148 def formatted_to_msgpack_binary true end |
#get_connection_options ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/fluent/plugin/out_amqp.rb', line 247 def () hosts = @hosts ||= Array.new(1, @host) opts = { hosts: hosts, port: @port, vhost: @vhost, pass: @pass, user: @user, ssl: @ssl, verify_ssl: @verify_ssl, heartbeat: @heartbeat, tls: @tls || nil, tls_cert: @tls_cert, tls_key: @tls_key, verify_peer: @tls_verify_peer } opts[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates return opts end |
#headers(tag, time, data) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/fluent/plugin/out_amqp.rb', line 220 def headers( tag, time, data ) h = {} log.debug "Processing Headers: #{@headers}" # A little messy this... # Trying to allow for header overrides where a header defined # earlier will be used if a later header is returning nil (ie not found and no default) h = Hash[ @headers .collect{|v| [v.name, v.getValue(data) ]} .delete_if{|x| x.last.nil?} ] h[@tag_header] = tag if @tag_header h[@time_header] = Time.at(time).utc.to_s if @time_header h end |
#multi_workers_ready? ⇒ Boolean
144 145 146 |
# File 'lib/fluent/plugin/out_amqp.rb', line 144 def multi_workers_ready? true end |
#routing_key(tag) ⇒ Object
212 213 214 215 216 217 218 |
# File 'lib/fluent/plugin/out_amqp.rb', line 212 def routing_key( tag ) if @tag_key tag else @key end end |
#shutdown ⇒ Object
139 140 141 142 |
# File 'lib/fluent/plugin/out_amqp.rb', line 139 def shutdown @connection.stop super end |
#start ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/fluent/plugin/out_amqp.rb', line 116 def start super begin log.info "Connecting to RabbitMQ..." @connection = Bunny.new() unless @connection @connection.start rescue Bunny::TCPConnectionFailed => e log.error "Connection to #{@host} failed" rescue Bunny::PossibleAuthenticationFailureError => e log.error "Could not authenticate as #{@user}" end log.info 'Creating new channel' @channel = @connection.create_channel return if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.info 'Creating new exchange (in start)', exchange: @exchange @exch = @channel.exchange(@exchange, type: @exchange_type.intern, passive: @passive, durable: @durable, auto_delete: @auto_delete) end |
#write(chunk) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/fluent/plugin/out_amqp.rb', line 156 def write(chunk) begin log.debug 'in write', chunk_id: dump_unique_id_hex(chunk.unique_id) log.debug 'raw exchange value is', exchange: @exchange.to_s if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN exchange_name = extract_placeholders(@exchange, chunk) log.info 'resolved exchange value is', exchange_name: exchange_name @exch = @channel.exchange(exchange_name, type: @exchange_type.intern, passive: @passive, durable: @durable, auto_delete: @auto_delete) end log.debug 'writing data to exchange', chunk_id: dump_unique_id_hex(chunk.unique_id) chunk.msgpack_each do |(tag, time, data)| begin msg_headers = headers(tag,time,data) begin data = JSON.dump( data ) unless data.is_a?( String ) rescue JSON::GeneratorError => e log.warn "Failure converting data object to json string: #{e.} - sending as raw object" # Debug only - otherwise we may pollute the fluent logs with unparseable events and loop log.debug "JSON.dump failure converting [#{data}]" end log.info "Sending message #{data}, :key => #{routing_key( tag)} :headers => #{headers(tag,time,data)}" @exch.publish( data, key: routing_key( tag ), persistent: @persistent, headers: msg_headers, content_type: @content_type, content_encoding: @content_encoding) # :nocov: # Hard to throw StandardError through test code rescue StandardError => e # This protects against invalid byteranges and other errors at a per-message level log.error "Unexpected error during message publishing: #{e.}" log.debug "Failure in publishing message [#{data}]" end end rescue MessagePack::MalformedFormatError => e # This has been observed when a server has filled the partition containing # the buffer files, and during replay the chunks were malformed log.error "Malformed msgpack in chunk - Did your server run out of space during buffering? #{e.}" rescue StandardError => e # Just in case theres any other errors during chunk loading. log.error "Unexpected error during message publishing: #{e.}" end # :nocov: end |