Class: Fluent::EventRouter
- Inherits:
-
Object
- Object
- Fluent::EventRouter
- Defined in:
- lib/fluent/event_router.rb
Overview
EventRouter is responsible to route events to a collector.
It has a list of MatchPattern and Collector pairs:
+----------------+ +-----------------+
| MatchPattern | | Collector |
+----------------+ +-----------------+
| access.** ---------> type forward |
| logs.** ---------> type copy |
| archive.** ---------> type s3 |
+----------------+ +-----------------+
EventRouter does:
1) receive an event at #emit methods 2) match the event’s tag with the MatchPatterns 3) forward the event to the corresponding Collector
Collector is either of Output, Filter or other EventRouter.
Defined Under Namespace
Classes: MatchCache, Pipeline, Rule
Instance Attribute Summary collapse
-
#default_collector ⇒ Object
Returns the value of attribute default_collector.
-
#emit_error_handler ⇒ Object
Returns the value of attribute emit_error_handler.
Instance Method Summary collapse
- #add_metric_callbacks(caller_plugin_id, callback) ⇒ Object
-
#add_rule(pattern, collector) ⇒ Object
called by Agent to add new match pattern and collector.
- #caller_plugin_id=(caller_plugin_id) ⇒ Object
- #emit(tag, time, record) ⇒ Object
- #emit_array(tag, array) ⇒ Object
- #emit_error_event(tag, time, record, error) ⇒ Object
- #emit_stream(tag, es) ⇒ Object
- #find_callback ⇒ Object
-
#initialize(default_collector, emit_error_handler) ⇒ EventRouter
constructor
A new instance of EventRouter.
- #match(tag) ⇒ Object
- #match?(tag) ⇒ Boolean
- #suppress_missing_match! ⇒ Object
Constructor Details
#initialize(default_collector, emit_error_handler) ⇒ EventRouter
Returns a new instance of EventRouter.
45 46 47 48 49 50 51 52 |
# File 'lib/fluent/event_router.rb', line 45 def initialize(default_collector, emit_error_handler) @match_rules = [] @match_cache = MatchCache.new @default_collector = default_collector @emit_error_handler = emit_error_handler @metric_callbacks = {} @caller_plugin_id = nil end |
Instance Attribute Details
#default_collector ⇒ Object
Returns the value of attribute default_collector.
54 55 56 |
# File 'lib/fluent/event_router.rb', line 54 def default_collector @default_collector end |
#emit_error_handler ⇒ Object
Returns the value of attribute emit_error_handler.
55 56 57 |
# File 'lib/fluent/event_router.rb', line 55 def emit_error_handler @emit_error_handler end |
Instance Method Details
#add_metric_callbacks(caller_plugin_id, callback) ⇒ Object
88 89 90 |
# File 'lib/fluent/event_router.rb', line 88 def add_metric_callbacks(caller_plugin_id, callback) @metric_callbacks[caller_plugin_id] = callback end |
#add_rule(pattern, collector) ⇒ Object
called by Agent to add new match pattern and collector
84 85 86 |
# File 'lib/fluent/event_router.rb', line 84 def add_rule(pattern, collector) @match_rules << Rule.new(pattern, collector) end |
#caller_plugin_id=(caller_plugin_id) ⇒ Object
92 93 94 |
# File 'lib/fluent/event_router.rb', line 92 def caller_plugin_id=(caller_plugin_id) @caller_plugin_id = caller_plugin_id end |
#emit(tag, time, record) ⇒ Object
104 105 106 107 108 |
# File 'lib/fluent/event_router.rb', line 104 def emit(tag, time, record) unless record.nil? emit_stream(tag, OneEventStream.new(time, record)) end end |
#emit_array(tag, array) ⇒ Object
110 111 112 |
# File 'lib/fluent/event_router.rb', line 110 def emit_array(tag, array) emit_stream(tag, ArrayEventStream.new(array)) end |
#emit_error_event(tag, time, record, error) ⇒ Object
125 126 127 |
# File 'lib/fluent/event_router.rb', line 125 def emit_error_event(tag, time, record, error) @emit_error_handler.emit_error_event(tag, time, record, error) end |
#emit_stream(tag, es) ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/fluent/event_router.rb', line 114 def emit_stream(tag, es) match(tag).emit_events(tag, es) if callback = find_callback callback.call(es) end rescue Pipeline::OutputError => e @emit_error_handler.handle_emits_error(tag, e.processed_es, e.internal_error) rescue => e @emit_error_handler.handle_emits_error(tag, es, e) end |
#find_callback ⇒ Object
96 97 98 99 100 101 102 |
# File 'lib/fluent/event_router.rb', line 96 def find_callback if @caller_plugin_id @metric_callbacks[@caller_plugin_id] else nil end end |
#match(tag) ⇒ Object
133 134 135 136 137 138 |
# File 'lib/fluent/event_router.rb', line 133 def match(tag) collector = @match_cache.get(tag) { find(tag) || @default_collector } collector end |
#match?(tag) ⇒ Boolean
129 130 131 |
# File 'lib/fluent/event_router.rb', line 129 def match?(tag) !!find(tag) end |
#suppress_missing_match! ⇒ Object
77 78 79 80 81 |
# File 'lib/fluent/event_router.rb', line 77 def suppress_missing_match! if @default_collector.respond_to?(:suppress_missing_match!) @default_collector.suppress_missing_match! end end |