Class: Fluent::Agent
- Inherits:
-
Object
- Object
- Fluent::Agent
- Includes:
- Configurable
- Defined in:
- lib/fluent/agent.rb
Overview
Agent is a resource unit who manages emittable plugins
Next step: ‘fluentd/root_agent.rb` Next step: `fluentd/label.rb`
Constant Summary
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#error_collector ⇒ Object
readonly
Returns the value of attribute error_collector.
-
#event_router ⇒ Object
readonly
Returns the value of attribute event_router.
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
Instance Method Summary collapse
- #add_filter(type, pattern, conf) ⇒ Object
- #add_match(type, pattern, conf) ⇒ Object
- #configure(conf) ⇒ Object
-
#emit_error_event(tag, time, record, error) ⇒ Object
For handling invalid record.
- #handle_emits_error(tag, es, error) ⇒ Object
-
#initialize(log:) ⇒ Agent
constructor
A new instance of Agent.
- #lifecycle(desc: false) ⇒ Object
- #lifecycle_control_list ⇒ Object
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize(log:) ⇒ Agent
Returns a new instance of Agent.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/agent.rb', line 32 def initialize(log:) super() @context = nil @outputs = [] @filters = [] @lifecycle_control_list = nil # lifecycle_control_list is the list of plugins in this agent, and ordered # from plugins which DOES emit, then DOESN'T emit # (input -> output w/ router -> filter -> output w/o router) # for start: use this order DESC # (because plugins which appears later in configurations will receive events from plugins which appears earlier) # for stop/before_shutdown/shutdown/after_shutdown/close/terminate: use this order ASC @lifecycle_cache = nil @log = log @event_router = EventRouter.new(NoMatchMatch.new(log), self) @error_collector = nil end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
56 57 58 |
# File 'lib/fluent/agent.rb', line 56 def context @context end |
#error_collector ⇒ Object (readonly)
Returns the value of attribute error_collector.
58 59 60 |
# File 'lib/fluent/agent.rb', line 58 def error_collector @error_collector end |
#event_router ⇒ Object (readonly)
Returns the value of attribute event_router.
57 58 59 |
# File 'lib/fluent/agent.rb', line 57 def event_router @event_router end |
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
55 56 57 |
# File 'lib/fluent/agent.rb', line 55 def filters @filters end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
53 54 55 |
# File 'lib/fluent/agent.rb', line 53 def log @log end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
54 55 56 |
# File 'lib/fluent/agent.rb', line 54 def outputs @outputs end |
Instance Method Details
#add_filter(type, pattern, conf) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/fluent/agent.rb', line 148 def add_filter(type, pattern, conf) log_type = conf.for_this_worker? ? :default : :worker0 log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type filter = Plugin.new_filter(type) filter.context_router = @event_router filter.configure(conf) @filters << filter @event_router.add_rule(pattern, filter) filter end |
#add_match(type, pattern, conf) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/fluent/agent.rb', line 126 def add_match(type, pattern, conf) log_type = conf.for_this_worker? ? :default : :worker0 log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) output.context_router = @event_router output.configure(conf) @outputs << output if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output? # TODO: ruby 2.3 or later: replace `output.respond_to?(:multi_output?) && output.multi_output?` with output&.multi_output? outputs = if output.respond_to?(:static_outputs) output.static_outputs else output.outputs end @outputs.push(*outputs) end @event_router.add_rule(pattern, output) output end |
#configure(conf) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/agent.rb', line 60 def configure(conf) super # initialize <match> and <filter> elements conf.elements('filter', 'match').each { |e| if !Fluent::Engine.supervisor_mode && e.for_another_worker? next end pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type if e.name == 'filter' add_filter(type, pattern, e) else add_match(type, pattern, e) end } end |
#emit_error_event(tag, time, record, error) ⇒ Object
For handling invalid record
162 163 |
# File 'lib/fluent/agent.rb', line 162 def emit_error_event(tag, time, record, error) end |
#handle_emits_error(tag, es, error) ⇒ Object
165 166 |
# File 'lib/fluent/agent.rb', line 165 def handle_emits_error(tag, es, error) end |
#lifecycle(desc: false) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/fluent/agent.rb', line 107 def lifecycle(desc: false) kind_list = if desc [:output, :filter, :output_with_router] else [:output_with_router, :filter, :output] end kind_list.each do |kind| list = if desc lifecycle_control_list[kind].reverse else lifecycle_control_list[kind] end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| yield instance, display_kind end end end |
#lifecycle_control_list ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/agent.rb', line 79 def lifecycle_control_list return @lifecycle_control_list if @lifecycle_control_list lifecycle_control_list = { input: [], output_with_router: [], filter: [], output: [], } if self.respond_to?(:inputs) inputs.each do |i| lifecycle_control_list[:input] << i end end outputs.each do |o| if o.has_router? lifecycle_control_list[:output_with_router] << o else lifecycle_control_list[:output] << o end end filters.each do |f| lifecycle_control_list[:filter] << f end @lifecycle_control_list = lifecycle_control_list end |