Class: LogStash::Inputs::RabbitMQ
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::RabbitMQ
- Includes:
- PluginMixins::RabbitMQConnection
- Defined in:
- lib/logstash/inputs/rabbitmq.rb
Overview
Pull events from a www.rabbitmq.com/[RabbitMQ] queue.
The default settings will create an entirely transient queue and listen for all messages by default. If you need durability or any other advanced settings, please set the appropriate options
This plugin uses the rubymarchhare.info/[March Hare] library for interacting with the RabbitMQ server. Most configuration options map directly to standard RabbitMQ and AMQP concepts. The www.rabbitmq.com/amqp-0-9-1-reference.html[AMQP 0-9-1 reference guide] and other parts of the RabbitMQ documentation are useful for deeper understanding.
The properties of messages received will be stored in the ‘[@metadata]` field if the `@metadata_enabled` setting is checked. Note that storing metadata may degrade performance. The following properties may be available (in most cases dependent on whether they were set by the sender):
-
app-id
-
cluster-id
-
consumer-tag
-
content-encoding
-
content-type
-
correlation-id
-
delivery-mode
-
exchange
-
expiration
-
message-id
-
priority
-
redeliver
-
reply-to
-
routing-key
-
timestamp
-
type
-
user-id
For example, to get the RabbitMQ message’s timestamp property into the Logstash event’s ‘@timestamp` field, use the date filter to parse the `[@metadata][timestamp]` field:
- source,ruby
-
filter {
if [@metadata][rabbitmq_properties][timestamp] { date { match => ["[@metadata][rabbitmq_properties][timestamp]", "UNIX"] } }
}
Additionally, any message headers will be saved in the ‘[@metadata]` field.
Constant Summary collapse
- MESSAGE_PROPERTIES =
The properties to extract from each message and store in a Technically the exchange, redeliver, and routing-key properties belong to the envelope and not the message but we ignore that distinction here. However, we extract the headers separately via get_headers even though the header table technically is a message property.
Freezing all strings so that code modifying the event’s If updating this list, remember to update the documentation above too.
[ "app-id", "cluster-id", "consumer-tag", "content-encoding", "content-type", "correlation-id", "delivery-mode", "exchange", "expiration", "message-id", "priority", "redeliver", "reply-to", "routing-key", "timestamp", "type", "user-id", ].map { |s| s.freeze }.freeze
- INTERNAL_QUEUE_POISON =
[]
Instance Method Summary collapse
- #bind_exchange! ⇒ Object
- #consume! ⇒ Object
- #declare_queue ⇒ Object
- #declare_queue! ⇒ Object
- #internal_queue_consume! ⇒ Object
- #on_cancellation ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #setup! ⇒ Object
- #shutdown_consumer ⇒ Object
- #stop ⇒ Object
Instance Method Details
#bind_exchange! ⇒ Object
194 195 196 197 198 199 200 201 202 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 194 def bind_exchange! if @exchange if @exchange_type # Only declare the exchange if @exchange_type is set! @logger.info? && @logger.info("Declaring exchange '#{@exchange}' with type #{@exchange_type}") @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable) end @hare_info.queue.bind(@exchange, :routing_key => @key) end end |
#consume! ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 217 def consume! @consumer = @hare_info.queue.build_consumer(:on_cancellation => Proc.new { on_cancellation }) do |, data| @internal_queue.put [, data] end begin @hare_info.queue.subscribe_with(@consumer, :manual_ack => @ack) rescue MarchHare::Exception => e @logger.warn("Could not subscribe to queue! Will retry in #{@subscription_retry_interval_seconds} seconds", :queue => @queue) sleep @subscription_retry_interval_seconds retry end internal_queue_consume! end |
#declare_queue ⇒ Object
208 209 210 211 212 213 214 215 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 208 def declare_queue @hare_info.channel.queue(@queue, :durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :passive => @passive, :arguments => @arguments) end |
#declare_queue! ⇒ Object
204 205 206 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 204 def declare_queue! @hare_info.queue = declare_queue() end |
#internal_queue_consume! ⇒ Object
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 234 def internal_queue_consume! i=0 last_delivery_tag=nil while true payload = @internal_queue.poll(10, TimeUnit::MILLISECONDS) if !payload # Nothing in the queue if last_delivery_tag # And we have unacked stuff @hare_info.channel.ack(last_delivery_tag, true) if @ack i=0 last_delivery_tag = nil end next end break if payload == INTERNAL_QUEUE_POISON , data = payload @codec.decode(data) do |event| decorate(event) if @metadata_enabled event.set("[@metadata][rabbitmq_headers]", get_headers()) event.set("[@metadata][rabbitmq_properties]", get_properties()) end @output_queue << event if event end i += 1 if i >= @prefetch_count @hare_info.channel.ack(.delivery_tag, true) if @ack i = 0 last_delivery_tag = nil else last_delivery_tag = .delivery_tag end end end |
#on_cancellation ⇒ Object
287 288 289 290 291 292 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 287 def on_cancellation if !stop? # If this isn't already part of a regular stop @logger.info("Received basic.cancel from #{rabbitmq_settings[:host]}, shutting down.") stop end end |
#register ⇒ Object
170 171 172 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 170 def register @internal_queue = java.util.concurrent.ArrayBlockingQueue.new(@prefetch_count*2) end |
#run(output_queue) ⇒ Object
174 175 176 177 178 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 174 def run(output_queue) setup! @output_queue = output_queue consume! end |
#setup! ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 180 def setup! connect! declare_queue! bind_exchange! @hare_info.channel.prefetch = @prefetch_count rescue => e @logger.warn("Error while setting up connection for rabbitmq input! Will retry.", :message => e., :class => e.class.name, :location => e.backtrace.first) sleep_for_retry retry end |
#shutdown_consumer ⇒ Object
278 279 280 281 282 283 284 285 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 278 def shutdown_consumer return unless @consumer @hare_info.channel.basic_cancel(@consumer.consumer_tag) until @consumer.terminated? @logger.info("Waiting for rabbitmq consumer to terminate before stopping!", :params => self.params) sleep 1 end end |
#stop ⇒ Object
272 273 274 275 276 |
# File 'lib/logstash/inputs/rabbitmq.rb', line 272 def stop @internal_queue.put(INTERNAL_QUEUE_POISON) shutdown_consumer close_connection end |