Module: Fluent::PluginHelper::EventLoop

Includes:
Thread
Included in:
Server, Timer
Defined in:
lib/fluent/plugin_helper/event_loop.rb

Defined Under Namespace

Classes: DefaultWatcher

Constant Summary collapse

EVENT_LOOP_RUN_DEFAULT_TIMEOUT =

stop : [-] shutdown : detach all event watchers on event loop close : stop event loop terminate: initialize internal state

0.5
EVENT_LOOP_SHUTDOWN_TIMEOUT =
5

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS

Instance Attribute Summary collapse

Attributes included from Thread

#_threads

Instance Method Summary collapse

Methods included from Thread

#stop, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_event_loopObject (readonly)

for tests



36
37
38
# File 'lib/fluent/plugin_helper/event_loop.rb', line 36

def _event_loop
  @_event_loop
end

Instance Method Details

#after_shutdownObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin_helper/event_loop.rb', line 116

def after_shutdown
  timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
  @_event_loop_mutex.synchronize do
    @_event_loop.watchers.reverse.each do |w|
      begin
        w.detach
      rescue => e
        log.warn "unexpected error while detaching event loop watcher", error: e
      end
    end
  end
  while @_event_loop_running
    if Fluent::Clock.now >= timeout_at
      log.warn "event loop does NOT exit until hard timeout."
      raise "event loop does NOT exit until hard timeout." if @under_plugin_development
      break
    end
    sleep 0.1
  end

  super
end

#closeObject



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin_helper/event_loop.rb', line 139

def close
  if @_event_loop_running
    begin
      @_event_loop.stop # we cannot check loop is running or not
    rescue RuntimeError => e
      raise unless e.message == 'loop not running'
    end
  end

  super
end

#event_loop_attach(watcher) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/fluent/plugin_helper/event_loop.rb', line 38

def event_loop_attach(watcher)
  @_event_loop_mutex.synchronize do
    @_event_loop.attach(watcher)
    @_event_loop_attached_watchers << watcher
    watcher
  end
end

#event_loop_detach(watcher) ⇒ Object



46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin_helper/event_loop.rb', line 46

def event_loop_detach(watcher)
  if watcher.attached?
    watcher.detach
  end
  @_event_loop_mutex.synchronize do
    @_event_loop_attached_watchers.delete(watcher)
  end
end

#event_loop_running?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/fluent/plugin_helper/event_loop.rb', line 70

def event_loop_running?
  @_event_loop_running
end

#event_loop_wait_until_startObject



55
56
57
# File 'lib/fluent/plugin_helper/event_loop.rb', line 55

def event_loop_wait_until_start
  sleep(0.1) until event_loop_running?
end

#event_loop_wait_until_stopObject



59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin_helper/event_loop.rb', line 59

def event_loop_wait_until_stop
  timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
  sleep(0.1) while event_loop_running? && Fluent::Clock.now < timeout_at
  if @_event_loop_running
    puts "terminating event_loop forcedly"
    caller.each{|bt| puts "\t#{bt}" }
    @_event_loop.stop rescue nil
    @_event_loop_running = true
  end
end

#initializeObject



74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin_helper/event_loop.rb', line 74

def initialize
  super
  @_event_loop = Coolio::Loop.new
  @_event_loop_running = false
  @_event_loop_mutex = Mutex.new
  # plugin MAY configure loop run timeout in #configure
  @_event_loop_run_timeout = EVENT_LOOP_RUN_DEFAULT_TIMEOUT
  @_event_loop_attached_watchers = []
end

#shutdownObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin_helper/event_loop.rb', line 100

def shutdown
  @_event_loop_mutex.synchronize do
    @_event_loop_attached_watchers.reverse.each do |w|
      if w.attached?
        begin
          w.detach
        rescue => e
          log.warn "unexpected error while detaching event loop watcher", error: e
        end
      end
    end
  end

  super
end

#startObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin_helper/event_loop.rb', line 84

def start
  super

  # event loop does not run here, so mutex lock is not required
  thread_create :event_loop do
    begin
      default_watcher = DefaultWatcher.new
      event_loop_attach(default_watcher)
      @_event_loop_running = true
      @_event_loop.run(@_event_loop_run_timeout) # this method blocks
    ensure
      @_event_loop_running = false
    end
  end
end

#terminateObject



151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin_helper/event_loop.rb', line 151

def terminate
  @_event_loop = nil
  @_event_loop_running = false
  @_event_loop_mutex = nil
  @_event_loop_run_timeout = nil

  super
end