Class: Fluent::Plugin::RabbitMQInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_rabbitmq.rb

Instance Method Summary collapse

Constructor Details

#initializeRabbitMQInput

Returns a new instance of RabbitMQInput.



78
79
80
81
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 78

def initialize
  super
  require "bunny"
end

Instance Method Details

#configure(conf) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 83

def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  bunny_options = {}
  bunny_options[:host] = @host if @host
  bunny_options[:hosts] = @hosts if @hosts
  bunny_options[:port] = @port if @port
  bunny_options[:user] = @user if @user
  bunny_options[:pass] = @pass if @pass
  bunny_options[:vhost] = @vhost if @vhost
  bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout
  bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout
  bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover
  bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval
  bunny_options[:recovery_attempts] = @recovery_attempts
  bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism
  bunny_options[:heartbeat] = @heartbeat if @heartbeat

  bunny_options[:tls] = @tls
  bunny_options[:tls_cert] = @tls_cert if @tls_cert
  bunny_options[:tls_key] = @tls_key if @tls_key
  bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  bunny_options[:verify_peer] = @verify_peer

  @parser = parser_create

  @routing_key ||= @tag
  @bunny = Bunny.new(bunny_options)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 168

def multi_workers_ready?
  true
end

#shutdownObject



172
173
174
175
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 172

def shutdown
  @bunny.close
  super
end

#startObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 115

def start
  super
  @bunny.start
  channel = @bunny.create_channel(nil, @consumer_pool_size)
  channel.prefetch(@prefetch_count) if @prefetch_count
  if @create_exchange
    exchange_options = {
        durable: @exchange_durable,
        auto_delete: @auto_delete
    }
    @bunny_exchange = Bunny::Exchange.new(channel, @exchange_type, @exchange, exchange_options)
    if @exchange_to_bind
      @bunny_exchange.bind(@exchange_to_bind, routing_key: @exchange_routing_key)
    end
  end
  queue_arguments = {}
  queue_arguments["x-message-ttl"] = @ttl if @ttl
  queue_arguments["x-queue-mode"] = @queue_mode if @queue_mode
  queue_arguments["x-queue-type"] = @queue_type if @queue_type
  queue = channel.queue(
    @queue,
    durable: @durable,
    exclusive: @exclusive,
    auto_delete: @auto_delete,
    arguments: queue_arguments
  )
  if @exchange
    queue.bind(@exchange, routing_key: @routing_key)
  end
  queue.subscribe(manual_ack: @manual_ack) do |delivery_info, properties, payload|
    begin
      @parser.parse(payload) do |time, record|
        time = if properties[:timestamp]
                 Fluent::EventTime.from_time(properties[:timestamp])
               else
                 time
               end
        if @include_headers
          record[@headers_key] = properties.headers
        end
        if @include_delivery_info
          record[@delivery_info_key] = delivery_info
        end
        router.emit(@tag, time, record)
      end
    rescue Fluent::Plugin::Parser::ParserError => e
      log.error "Parser error: #{e.message}", error: e, payload: payload, tag: @tag
    ensure
      channel.ack(delivery_info.delivery_tag) if @manual_ack
    end
  end
end