Class: Fluent::Plugin::Parser::TimeoutChecker

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/parser.rb

Instance Method Summary collapse

Constructor Details

#initialize(timeout) ⇒ TimeoutChecker

This implementation now uses mutex because parser is typically used in input. If this has a performance issue under high concurreny, use concurrent-ruby’s map instead.



33
34
35
36
37
38
39
# File 'lib/fluent/plugin/parser.rb', line 33

def initialize(timeout)
  @map = {}
  @flag = ServerEngine::BlockingFlag.new
  @mutex = Mutex.new
  @timeout = timeout
  @timeout_checker = nil
end

Instance Method Details

#executeObject



63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/parser.rb', line 63

def execute
  th = Thread.current
  @mutex.synchronize { @map[th] = Time.now }
  yield
ensure
  # Need clean up here because if next event is delayed, incorrect exception will be raised in normal flow.
  @mutex.synchronize { @map.delete(th) }
end

#startObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/parser.rb', line 41

def start
  @thread = ::Thread.new {
    until @flag.wait_for_set(0.5)
      now = Time.now
      @mutex.synchronize {
        @map.keys.each { |th|
          time = @map[th]
          if now - time > @timeout
            th.raise UncatchableError, "parsing timed out"
            @map.delete(th)
          end
        }
      }
    end
  }
end

#stopObject



58
59
60
61
# File 'lib/fluent/plugin/parser.rb', line 58

def stop
  @flag.set!
  @thread.join
end