Class: LogStash::Inputs::RabbitMQ

Inherits:
Threadable
  • Object
show all
Includes:
PluginMixins::RabbitMQConnection
Defined in:
lib/logstash/inputs/rabbitmq.rb

Overview

Pull events from a www.rabbitmq.com/[RabbitMQ] queue.

The default settings will create an entirely transient queue and listen for all messages by default. If you need durability or any other advanced settings, please set the appropriate options

This plugin uses the rubymarchhare.info/[March Hare] library for interacting with the RabbitMQ server. Most configuration options map directly to standard RabbitMQ and AMQP concepts. The www.rabbitmq.com/amqp-0-9-1-reference.html[AMQP 0-9-1 reference guide] and other parts of the RabbitMQ documentation are useful for deeper understanding.

The properties of messages received will be stored in the ‘[@metadata]` field if the `@metadata_enabled` setting is checked. Note that storing metadata may degrade performance. The following properties may be available (in most cases dependent on whether they were set by the sender):

  • app-id

  • cluster-id

  • consumer-tag

  • content-encoding

  • content-type

  • correlation-id

  • delivery-mode

  • exchange

  • expiration

  • message-id

  • priority

  • redeliver

  • reply-to

  • routing-key

  • timestamp

  • type

  • user-id

For example, to get the RabbitMQ message’s timestamp property into the Logstash event’s ‘@timestamp` field, use the date filter to parse the `[@metadata][timestamp]` field:

source,ruby

filter {

if [@metadata][rabbitmq_properties][timestamp] {
  date {
    match => ["[@metadata][rabbitmq_properties][timestamp]", "UNIX"]
  }
}

}

Additionally, any message headers will be saved in the ‘[@metadata]` field.

Constant Summary collapse

MESSAGE_PROPERTIES =

The properties to extract from each message and store in a Technically the exchange, redeliver, and routing-key properties belong to the envelope and not the message but we ignore that distinction here. However, we extract the headers separately via get_headers even though the header table technically is a message property.

Freezing all strings so that code modifying the event’s If updating this list, remember to update the documentation above too.

[
  "app-id",
  "cluster-id",
  "consumer-tag",
  "content-encoding",
  "content-type",
  "correlation-id",
  "delivery-mode",
  "exchange",
  "expiration",
  "message-id",
  "priority",
  "redeliver",
  "reply-to",
  "routing-key",
  "timestamp",
  "type",
  "user-id",
].map { |s| s.freeze }.freeze
INTERNAL_QUEUE_POISON =
[]

Instance Method Summary collapse

Instance Method Details

#bind_exchange!Object



194
195
196
197
198
199
200
201
202
# File 'lib/logstash/inputs/rabbitmq.rb', line 194

def bind_exchange!
  if @exchange
    if @exchange_type # Only declare the exchange if @exchange_type is set!
      @logger.info? && @logger.info("Declaring exchange '#{@exchange}' with type #{@exchange_type}")
      @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable)
    end
    @hare_info.queue.bind(@exchange, :routing_key => @key)
  end
end

#consume!Object



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/logstash/inputs/rabbitmq.rb', line 217

def consume!
  @consumer = @hare_info.queue.build_consumer(:on_cancellation => Proc.new { on_cancellation }) do |, data|
    @internal_queue.put [, data]
  end

  begin
    @hare_info.queue.subscribe_with(@consumer, :manual_ack => @ack)
  rescue MarchHare::Exception => e
    @logger.warn("Could not subscribe to queue! Will retry in #{@subscription_retry_interval_seconds} seconds", :queue => @queue)

    sleep @subscription_retry_interval_seconds
    retry
  end

  internal_queue_consume!
end

#declare_queueObject



208
209
210
211
212
213
214
215
# File 'lib/logstash/inputs/rabbitmq.rb', line 208

def declare_queue
  @hare_info.channel.queue(@queue,
                           :durable     => @durable,
                           :auto_delete => @auto_delete,
                           :exclusive   => @exclusive,
                           :passive     => @passive,
                           :arguments   => @arguments)
end

#declare_queue!Object



204
205
206
# File 'lib/logstash/inputs/rabbitmq.rb', line 204

def declare_queue!
  @hare_info.queue = declare_queue()
end

#internal_queue_consume!Object



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/logstash/inputs/rabbitmq.rb', line 234

def internal_queue_consume!
  i=0
  last_delivery_tag=nil
  while true
    payload = @internal_queue.poll(10, TimeUnit::MILLISECONDS)
    if !payload  # Nothing in the queue
      if last_delivery_tag # And we have unacked stuff
        @hare_info.channel.ack(last_delivery_tag, true) if @ack
        i=0
        last_delivery_tag = nil
      end
      next
    end

    break if payload == INTERNAL_QUEUE_POISON

    , data = payload
    @codec.decode(data) do |event|
      decorate(event)
      if @metadata_enabled
        event.set("[@metadata][rabbitmq_headers]", get_headers())
        event.set("[@metadata][rabbitmq_properties]", get_properties())
      end
      @output_queue << event if event
    end

    i += 1

    if i >= @prefetch_count
      @hare_info.channel.ack(.delivery_tag, true) if @ack
      i = 0
      last_delivery_tag = nil
    else
      last_delivery_tag = .delivery_tag
    end
  end
end

#on_cancellationObject



287
288
289
290
291
292
# File 'lib/logstash/inputs/rabbitmq.rb', line 287

def on_cancellation
  if !stop? # If this isn't already part of a regular stop
    @logger.info("Received basic.cancel from #{rabbitmq_settings[:host]}, shutting down.")
    stop
  end
end

#registerObject



170
171
172
# File 'lib/logstash/inputs/rabbitmq.rb', line 170

def register
  @internal_queue = java.util.concurrent.ArrayBlockingQueue.new(@prefetch_count*2)
end

#run(output_queue) ⇒ Object



174
175
176
177
178
# File 'lib/logstash/inputs/rabbitmq.rb', line 174

def run(output_queue)
  setup!
  @output_queue = output_queue
  consume!
end

#setup!Object



180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/logstash/inputs/rabbitmq.rb', line 180

def setup!
  connect!
  declare_queue!
  bind_exchange!
  @hare_info.channel.prefetch = @prefetch_count
rescue => e
  @logger.warn("Error while setting up connection for rabbitmq input! Will retry.",
               :message => e.message,
               :class => e.class.name,
               :location => e.backtrace.first)
  sleep_for_retry
  retry
end

#shutdown_consumerObject



278
279
280
281
282
283
284
285
# File 'lib/logstash/inputs/rabbitmq.rb', line 278

def shutdown_consumer
  return unless @consumer
  @hare_info.channel.basic_cancel(@consumer.consumer_tag)
  until @consumer.terminated?
    @logger.info("Waiting for rabbitmq consumer to terminate before stopping!", :params => self.params)
    sleep 1
  end
end

#stopObject



272
273
274
275
276
# File 'lib/logstash/inputs/rabbitmq.rb', line 272

def stop
  @internal_queue.put(INTERNAL_QUEUE_POISON)
  shutdown_consumer
  close_connection
end