Class: Fluent::Plugin::MapSupport
- Inherits:
-
Object
- Object
- Fluent::Plugin::MapSupport
- Defined in:
- lib/fluent/plugin/map_support.rb
Instance Method Summary collapse
- #do_map(tag, es) ⇒ Object
- #do_map_filter(tag, es) ⇒ Object
- #do_map_output(tag, es) ⇒ Object
- #generate_tuples ⇒ Object
- #generate_tuples_filter(tag, es) ⇒ Object
- #generate_tuples_output(tag, es) ⇒ Object
-
#initialize(map, plugin) ⇒ MapSupport
constructor
A new instance of MapSupport.
- #stop ⇒ Object
- #timeout_block ⇒ Object
Constructor Details
#initialize(map, plugin) ⇒ MapSupport
Returns a new instance of MapSupport.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/map_support.rb', line 22 def initialize(map, plugin) @map = map @plugin = plugin @checker = Fluent::Plugin::Parser::TimeoutChecker.new(@plugin.timeout) @checker.start if plugin.is_a?(Fluent::Plugin::Filter) singleton_class.module_eval(<<-CODE) def map_func(time, record) #{@map} end CODE class << self alias_method :generate_tuples, :generate_tuples_filter alias_method :do_map, :do_map_filter end elsif plugin.is_a?(Fluent::Plugin::Output) singleton_class.module_eval(<<-CODE) def map_func(tag, time, record) #{@map} end CODE class << self alias_method :generate_tuples, :generate_tuples_output alias_method :do_map, :do_map_output end end end |
Instance Method Details
#do_map(tag, es) ⇒ Object
51 52 53 |
# File 'lib/fluent/plugin/map_support.rb', line 51 def do_map(tag, es) # This method will be overwritten in #initailize. end |
#do_map_filter(tag, es) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/map_support.rb', line 55 def do_map_filter(tag, es) tuples = generate_tuples(tag, es) tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new} tuples.each do |time, record| if time == nil || record == nil raise SyntaxError.new end tag_output_es[tag].add(time, record) @plugin.log.trace { [tag, time, record].inspect } end tag_output_es end |
#do_map_output(tag, es) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/map_support.rb', line 69 def do_map_output(tag, es) tuples = generate_tuples(tag, es) tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new} tuples.each do |tag, time, record| if time == nil || record == nil raise SyntaxError.new end tag_output_es[tag].add(time, record) @plugin.log.trace { [tag, time, record].inspect } end tag_output_es end |
#generate_tuples ⇒ Object
83 84 85 |
# File 'lib/fluent/plugin/map_support.rb', line 83 def generate_tuples # This method will be overwritten in #initailize. end |
#generate_tuples_filter(tag, es) ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/map_support.rb', line 87 def generate_tuples_filter(tag, es) tuples = [] es.each {|time, record| timeout_block do new_tuple = map_func(time, record) tuples.concat new_tuple end } tuples end |
#generate_tuples_output(tag, es) ⇒ Object
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/fluent/plugin/map_support.rb', line 98 def generate_tuples_output(tag, es) tuples = [] es.each {|time, record| timeout_block do new_tuple = map_func(tag, time, record) tuples.concat new_tuple end } tuples end |
#stop ⇒ Object
119 120 121 |
# File 'lib/fluent/plugin/map_support.rb', line 119 def stop @checker.stop end |
#timeout_block ⇒ Object
109 110 111 112 113 114 115 116 117 |
# File 'lib/fluent/plugin/map_support.rb', line 109 def timeout_block begin @checker.execute { yield } rescue Timeout::Error @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"} end end |