Class: Fluent::Plugin::MapSupport

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/map_support.rb

Instance Method Summary collapse

Constructor Details

#initialize(map, plugin) ⇒ MapSupport

Returns a new instance of MapSupport.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/map_support.rb', line 22

def initialize(map, plugin)
  @map = map
  @plugin = plugin
  @checker = Fluent::Plugin::Parser::TimeoutChecker.new(@plugin.timeout)
  @checker.start

  if plugin.is_a?(Fluent::Plugin::Filter)
    singleton_class.module_eval(<<-CODE)
    def map_func(time, record)
      #{@map}
    end
  CODE
    class << self
      alias_method :generate_tuples, :generate_tuples_filter
      alias_method :do_map, :do_map_filter
    end
  elsif plugin.is_a?(Fluent::Plugin::Output)
    singleton_class.module_eval(<<-CODE)
    def map_func(tag, time, record)
      #{@map}
    end
  CODE
    class << self
      alias_method :generate_tuples, :generate_tuples_output
      alias_method :do_map, :do_map_output
    end
  end
end

Instance Method Details

#do_map(tag, es) ⇒ Object



51
52
53
# File 'lib/fluent/plugin/map_support.rb', line 51

def do_map(tag, es)
  # This method will be overwritten in #initailize.
end

#do_map_filter(tag, es) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/map_support.rb', line 55

def do_map_filter(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#do_map_output(tag, es) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/map_support.rb', line 69

def do_map_output(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |tag, time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end

#generate_tuplesObject



83
84
85
# File 'lib/fluent/plugin/map_support.rb', line 83

def generate_tuples
  # This method will be overwritten in #initailize.
end

#generate_tuples_filter(tag, es) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/map_support.rb', line 87

def generate_tuples_filter(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#generate_tuples_output(tag, es) ⇒ Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/map_support.rb', line 98

def generate_tuples_output(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(tag, time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end

#stopObject



119
120
121
# File 'lib/fluent/plugin/map_support.rb', line 119

def stop
  @checker.stop
end

#timeout_blockObject



109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/map_support.rb', line 109

def timeout_block
  begin
    @checker.execute {
      yield
    }
  rescue Timeout::Error
    @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
  end
end