Class: Fluent::MeasureTime

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/measure_timable.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plugin, log, router) ⇒ MeasureTime

Returns a new instance of MeasureTime.



27
28
29
30
31
32
33
34
# File 'lib/fluent/plugin/measure_timable.rb', line 27

def initialize(plugin, log, router)
  @plugin = plugin
  @klass = @plugin.class
  @log = log
  @router = router
  @times = []
  @mutex = Mutex.new
end

Instance Attribute Details

#hookObject (readonly)

Returns the value of attribute hook.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def hook
  @hook
end

#intervalObject (readonly)

Returns the value of attribute interval.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def interval
  @interval
end

#logObject (readonly)

Returns the value of attribute log.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def log
  @log
end

#mutexObject (readonly)

Returns the value of attribute mutex.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def mutex
  @mutex
end

#pluginObject (readonly)

Returns the value of attribute plugin.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def plugin
  @plugin
end

#routerObject (readonly)

Returns the value of attribute router.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def router
  @router
end

#tagObject (readonly)

Returns the value of attribute tag.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def tag
  @tag
end

#threadObject (readonly)

Returns the value of attribute thread.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def thread
  @thread
end

#timesObject (readonly)

Returns the value of attribute times.



26
27
28
# File 'lib/fluent/plugin/measure_timable.rb', line 26

def times
  @times
end

Instance Method Details

#apply_hookObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/measure_timable.rb', line 59

def apply_hook
  @plugin.instance_eval <<EOF
    def #{@hook}(*args)
      measure_time.measure_time do
        super
      end
    end
    def start
      super
      measure_time.start
    end
    def stop
      super
      measure_time.stop
    end
EOF
end

#configure(conf) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/measure_timable.rb', line 36

def configure(conf)
  @tag = conf['tag'] || 'measure_time'
  unless @hook = conf['hook']
    raise Fluent::ConfigError, '`hook` option must be specified in <measure_time></measure_time> directive'
  end
  @hook_msg = {:class => @klass.to_s, :hook => @hook.to_s, :object_id => @plugin.object_id.to_s}
  @interval = conf['interval'].to_i if conf['interval']
  @add_or_emit_proc =
    if @interval
      # add to calculate statistics in each interval
      Proc.new {|elapsed|
        @mutex.synchronize { @times << elapsed }
      }
    else
      # emit information immediately
      Proc.new {|elapsed|
        msg = {:time => elapsed}.merge(@hook_msg)
        router.emit(@tag, ::Fluent::Engine.now, msg)
      }
    end
  apply_hook
end

#flush(now) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/measure_timable.rb', line 112

def flush(now)
  times = []
  @mutex.synchronize do
    times = @times.dup
    @times.clear
  end
  triple = nil
  unless times.empty?
    num = times.size
    max = num == 0 ? 0 : times.max
    avg = num == 0 ? 0 : times.map(&:to_f).inject(:+) / num.to_f
    triple = [@tag, now, {:max => max, :avg => avg, :num => num}.merge(@hook_msg)]
    router.emit(*triple)
  end
  triple
end

#measure_timeObject



77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/measure_timable.rb', line 77

def measure_time
  started = Time.now
  output = yield
  elapsed = (Time.now - started).to_f
  log.debug "elapsed time at #{@klass}##{@hook} is #{elapsed} sec"
  @add_or_emit_proc.call(elapsed)
  output
end

#runObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/measure_timable.rb', line 97

def run
  @last_checked ||= ::Fluent::Engine.now
  while (sleep 0.5)
    begin
      now = ::Fluent::Engine.now
      if now - @last_checked >= @interval
        flush(now)
        @last_checked = now
      end
    rescue => e
      log.warn "in_measure_time: hook #{@klass}##{@hook} #{e.class} #{e.message} #{e.backtrace.first}"
    end
  end
end

#startObject



86
87
88
89
# File 'lib/fluent/plugin/measure_timable.rb', line 86

def start
  return unless @interval
  @thread = Thread.new(&method(:run))
end

#stopObject



91
92
93
94
95
# File 'lib/fluent/plugin/measure_timable.rb', line 91

def stop
  return unless @interval
  @thread.terminate
  @thread.join
end