Class: Fluent::EventRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/event_router.rb

Overview

EventRouter is responsible to route events to a collector.

It has a list of MatchPattern and Collector pairs:

+----------------+     +-----------------+
|  MatchPattern  |     |    Collector    |
+----------------+     +-----------------+
|   access.**  ---------> type forward   |
|     logs.**  ---------> type copy      |
|  archive.**  ---------> type s3        |
+----------------+     +-----------------+

EventRouter does:

1) receive an event at #emit methods 2) match the event’s tag with the MatchPatterns 3) forward the event to the corresponding Collector

Collector is either of Output, Filter or other EventRouter.

Defined Under Namespace

Classes: MatchCache, Pipeline, Rule

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(default_collector, emit_error_handler) ⇒ EventRouter

Returns a new instance of EventRouter.



45
46
47
48
49
50
51
52
# File 'lib/fluent/event_router.rb', line 45

def initialize(default_collector, emit_error_handler)
  @match_rules = []
  @match_cache = MatchCache.new
  @default_collector = default_collector
  @emit_error_handler = emit_error_handler
  @metric_callbacks = {}
  @caller_plugin_id = nil
end

Instance Attribute Details

#default_collectorObject

Returns the value of attribute default_collector.



54
55
56
# File 'lib/fluent/event_router.rb', line 54

def default_collector
  @default_collector
end

#emit_error_handlerObject

Returns the value of attribute emit_error_handler.



55
56
57
# File 'lib/fluent/event_router.rb', line 55

def emit_error_handler
  @emit_error_handler
end

Instance Method Details

#add_metric_callbacks(caller_plugin_id, callback) ⇒ Object



88
89
90
# File 'lib/fluent/event_router.rb', line 88

def add_metric_callbacks(caller_plugin_id, callback)
  @metric_callbacks[caller_plugin_id] = callback
end

#add_rule(pattern, collector) ⇒ Object

called by Agent to add new match pattern and collector



84
85
86
# File 'lib/fluent/event_router.rb', line 84

def add_rule(pattern, collector)
  @match_rules << Rule.new(pattern, collector)
end

#caller_plugin_id=(caller_plugin_id) ⇒ Object



92
93
94
# File 'lib/fluent/event_router.rb', line 92

def caller_plugin_id=(caller_plugin_id)
  @caller_plugin_id = caller_plugin_id
end

#emit(tag, time, record) ⇒ Object



104
105
106
107
108
# File 'lib/fluent/event_router.rb', line 104

def emit(tag, time, record)
  unless record.nil?
    emit_stream(tag, OneEventStream.new(time, record))
  end
end

#emit_array(tag, array) ⇒ Object



110
111
112
# File 'lib/fluent/event_router.rb', line 110

def emit_array(tag, array)
  emit_stream(tag, ArrayEventStream.new(array))
end

#emit_error_event(tag, time, record, error) ⇒ Object



125
126
127
# File 'lib/fluent/event_router.rb', line 125

def emit_error_event(tag, time, record, error)
  @emit_error_handler.emit_error_event(tag, time, record, error)
end

#emit_stream(tag, es) ⇒ Object



114
115
116
117
118
119
120
121
122
123
# File 'lib/fluent/event_router.rb', line 114

def emit_stream(tag, es)
  match(tag).emit_events(tag, es)
  if callback = find_callback
    callback.call(es)
  end
rescue Pipeline::OutputError => e
  @emit_error_handler.handle_emits_error(tag, e.processed_es, e.internal_error)
rescue => e
  @emit_error_handler.handle_emits_error(tag, es, e)
end

#find_callbackObject



96
97
98
99
100
101
102
# File 'lib/fluent/event_router.rb', line 96

def find_callback
  if @caller_plugin_id
    @metric_callbacks[@caller_plugin_id]
  else
    nil
  end
end

#match(tag) ⇒ Object



133
134
135
136
137
138
# File 'lib/fluent/event_router.rb', line 133

def match(tag)
  collector = @match_cache.get(tag) {
    find(tag) || @default_collector
  }
  collector
end

#match?(tag) ⇒ Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/fluent/event_router.rb', line 129

def match?(tag)
  !!find(tag)
end

#suppress_missing_match!Object



77
78
79
80
81
# File 'lib/fluent/event_router.rb', line 77

def suppress_missing_match!
  if @default_collector.respond_to?(:suppress_missing_match!)
    @default_collector.suppress_missing_match!
  end
end