Class: Fluent::Plugin::AMQPInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_amqp.rb

Overview

AMQPInput to be used as a Fluent SOURCE, reading messages from a RabbitMQ message broker

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connectionObject

Bunny connection handle

- Allows mocking for test purposes


17
18
19
# File 'lib/fluent/plugin/in_amqp.rb', line 17

def connection
  @connection
end

Instance Method Details

#configure(conf) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/in_amqp.rb', line 48

def configure(conf)
  conf['format'] ||= conf['payload_format'] # legacy
  compat_parameters_convert(conf, :parser)

  super

  parser_config = conf.elements('parse').first
  if parser_config
    @parser = parser_create(conf: parser_config)
  end

  @conf = conf
  unless (@host || @hosts) && @queue
    raise Fluent::ConfigError, "'host(s)' and 'queue' must be all specified."
  end
  check_tls_configuration
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/fluent/plugin/in_amqp.rb', line 99

def multi_workers_ready?
  true
end

#shutdownObject

AMQPInput#run



93
94
95
96
97
# File 'lib/fluent/plugin/in_amqp.rb', line 93

def shutdown
  log.info "Closing connection"
  @connection.stop
  super
end

#startObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/in_amqp.rb', line 66

def start
  super
  # Create a new connection, unless its already been provided to us
  @connection = Bunny.new get_connection_options unless @connection
  @connection.start
  @channel = @connection.create_channel

  if @exclusive && fluentd_worker_id > 0
    log.info 'Config requested exclusive queue with multiple workers'
    @queue += ".#{fluentd_worker_id}"
    log.info "Renamed queue name to include worker id: #{@queue}"
  end

  q = @channel.queue(@queue, passive: @passive, durable: @durable,
                   exclusive: @exclusive, auto_delete: @auto_delete)
  if @bind_exchange
    log.info "Binding #{@queue} to #{@exchange}, :routing_key => #{@routing_key}"
    q.bind(exchange=@exchange, routing_key: @routing_key)
  end

  q.subscribe do |delivery, meta, msg|
    log.debug "Recieved message #{@msg}"
    payload = parse_payload(msg)
    router.emit(parse_tag(delivery, meta), parse_time(meta), payload)
  end
end