Class: Fluent::EventTailInput

Inherits:
TailInput
  • Object
show all
Defined in:
lib/fluent/plugin/in_event_tail.rb

Instance Method Summary collapse

Instance Method Details

#configure_parser(conf) ⇒ Object



41
42
43
# File 'lib/fluent/plugin/in_event_tail.rb', line 41

def configure_parser(conf)
  # just disable the default parser
end

#emit_array(tag, array) ⇒ Object



67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_event_tail.rb', line 67

def emit_array(tag, array)
  unless tag.nil? || array.empty?
    begin
      Engine.emit_array(tag, array)
    rescue => e
      # ignore errors. Engine shows logs and backtraces.
    end
  end
end

#parse_line(line, &block) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/in_event_tail.rb', line 77

def parse_line(line, &block)
  msg = Yajl.load(line)
  tag = msg[0].to_s
  entries = msg[1]

  # [tag, [[time,record], [time,record], ...]]
  if entries.is_a? Array
    entries.each do |e|
      time = parse_time(e[0])
      record = e[1]
      block.call(tag, time, record)
    end

    # [tag, time, record]
  else
    time = parse_time(msg[1])
    record = msg[2]
    block.call(tag, time, record)
  end
end

#parse_time(time) ⇒ Object



98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/in_event_tail.rb', line 98

def parse_time(time)
  if !@time_format.nil? and time.is_a? String
    Time.strptime(time, @time_format).to_f
  else
    time = time.to_i
    time = Engine.now if time == 0
    time
  end
end

#receive_lines(lines) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/in_event_tail.rb', line 45

def receive_lines(lines)
  array = []
  last_tag = nil
  lines.each do |line|
    begin
      line.chomp!
      parse_line(line) do |tag, time, record|
        if last_tag != tag
          emit_array(last_tag, array)
          array = []
          last_tag = tag
        end
        array.push([time, record])
      end
    rescue
      $log.warn line.dump, :error=>$!.to_s
      $log.debug_backtrace
    end
  end
  emit_array(last_tag, array)
end