Class: Fluent::Plugin::AMQPInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::AMQPInput
- Defined in:
- lib/fluent/plugin/in_amqp.rb
Overview
AMQPInput to be used as a Fluent SOURCE, reading messages from a RabbitMQ message broker
Instance Attribute Summary collapse
-
#connection ⇒ Object
Bunny connection handle - Allows mocking for test purposes.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
-
#shutdown ⇒ Object
AMQPInput#run.
- #start ⇒ Object
Instance Attribute Details
#connection ⇒ Object
Bunny connection handle
- Allows mocking for test purposes
17 18 19 |
# File 'lib/fluent/plugin/in_amqp.rb', line 17 def connection @connection end |
Instance Method Details
#configure(conf) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/in_amqp.rb', line 48 def configure(conf) conf['format'] ||= conf['payload_format'] # legacy compat_parameters_convert(conf, :parser) super parser_config = conf.elements('parse').first if parser_config @parser = parser_create(conf: parser_config) end @conf = conf unless (@host || @hosts) && @queue raise Fluent::ConfigError, "'host(s)' and 'queue' must be all specified." end check_tls_configuration end |
#multi_workers_ready? ⇒ Boolean
99 100 101 |
# File 'lib/fluent/plugin/in_amqp.rb', line 99 def multi_workers_ready? true end |
#shutdown ⇒ Object
AMQPInput#run
93 94 95 96 97 |
# File 'lib/fluent/plugin/in_amqp.rb', line 93 def shutdown log.info "Closing connection" @connection.stop super end |
#start ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/fluent/plugin/in_amqp.rb', line 66 def start super # Create a new connection, unless its already been provided to us @connection = Bunny.new unless @connection @connection.start @channel = @connection.create_channel if @exclusive && fluentd_worker_id > 0 log.info 'Config requested exclusive queue with multiple workers' @queue += ".#{fluentd_worker_id}" log.info "Renamed queue name to include worker id: #{@queue}" end q = @channel.queue(@queue, passive: @passive, durable: @durable, exclusive: @exclusive, auto_delete: @auto_delete) if @bind_exchange log.info "Binding #{@queue} to #{@exchange}, :routing_key => #{@routing_key}" q.bind(exchange=@exchange, routing_key: @routing_key) end q.subscribe do |delivery, , msg| log.debug "Recieved message #{@msg}" payload = parse_payload(msg) router.emit(parse_tag(delivery, ), parse_time(), payload) end end |