Module: Fluent::PluginHelper::Thread

Included in:
ChildProcess, EventLoop, HttpServer
Defined in:
lib/fluent/plugin_helper/thread.rb

Constant Summary collapse

THREAD_DEFAULT_WAIT_SECONDS =
1
THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS =

second

100

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#_threadsObject (readonly)

stop : mark callback thread as stopped shutdown : [-] close : correct stopped threads terminate: kill all threads



30
31
32
# File 'lib/fluent/plugin_helper/thread.rb', line 30

def _threads
  @_threads
end

Instance Method Details

#after_shutdownObject



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fluent/plugin_helper/thread.rb', line 140

def after_shutdown
  super
  wakeup_threads = []
  @_threads_mutex.synchronize do
    @_threads.each_value do |thread|
      wakeup_threads << thread if thread.alive? && thread.status == "sleep"
    end
  end
  wakeup_threads.each do |thread|
    thread.wakeup if thread.alive?
  end
end

#closeObject



153
154
155
156
157
158
159
160
161
162
# File 'lib/fluent/plugin_helper/thread.rb', line 153

def close
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    if !thread || thread.join(@_thread_wait_seconds)
      @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
    end
  end

  super
end

#initializeObject



117
118
119
120
121
122
# File 'lib/fluent/plugin_helper/thread.rb', line 117

def initialize
  super
  @_threads_mutex = Mutex.new
  @_threads = {}
  @_thread_wait_seconds = THREAD_DEFAULT_WAIT_SECONDS
end

#stopObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin_helper/thread.rb', line 124

def stop
  super
  wakeup_threads = []
  @_threads_mutex.synchronize do
    @_threads.each_value do |thread|
      thread[:_fluentd_plugin_helper_thread_running] = false
      wakeup_threads << thread if thread.alive? && thread.status == "sleep"
    end
  end
  wakeup_threads.each do |thread|
    if thread.alive?
      thread.wakeup
    end
  end
end

#terminateObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/fluent/plugin_helper/thread.rb', line 164

def terminate
  super
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    log.warn "killing existing thread", thread: thread
    thread.kill if thread
  end
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    thread.join
    @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
  end
  @_thread_wait_seconds = nil
end

#thread_create(title) ⇒ Object

Ruby 2.2.3 or earlier (and all 2.1.x) cause bug about Threading (“Stack consistency error”)

by passing splatted argument to `yield`

bugs.ruby-lang.org/issues/11027 We can enable to pass arguments after expire of Ruby 2.1 (& older 2.2.x) def thread_create(title, *args)

Thread.new(*args) do |*t_args|
  yield *t_args

Raises:

  • (ArgumentError)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin_helper/thread.rb', line 65

def thread_create(title)
  raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
  raise ArgumentError, "BUG: callback not specified" unless block_given?
  m = Mutex.new
  m.lock
  thread = ::Thread.new do
    m.lock # run thread after that thread is successfully set into @_threads
    m.unlock
    thread_exit = false
    ::Thread.current[:_fluentd_plugin_helper_thread_title] = title
    ::Thread.current[:_fluentd_plugin_helper_thread_started] = true
    ::Thread.current[:_fluentd_plugin_helper_thread_running] = true
    begin
      yield
      thread_exit = true
    rescue Exception => e
      log.warn "thread exited by unexpected error", plugin: self.class, title: title, error: e
      thread_exit = true
      raise
    ensure
      @_threads_mutex.synchronize do
        @_threads.delete(::Thread.current.object_id)
      end
      ::Thread.current[:_fluentd_plugin_helper_thread_running] = false
      if ::Thread.current.alive? && !thread_exit
        log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
      end
    end
  end
  thread.abort_on_exception = true
  thread.name = title.to_s if thread.respond_to?(:name)
  @_threads_mutex.synchronize do
    @_threads[thread.object_id] = thread
  end
  m.unlock
  thread
end

#thread_current_running?Boolean

Returns:

  • (Boolean)


32
33
34
35
# File 'lib/fluent/plugin_helper/thread.rb', line 32

def thread_current_running?
  # checker for code in callback of thread_create
  ::Thread.current[:_fluentd_plugin_helper_thread_running] || false
end

#thread_exist?(title) ⇒ Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/fluent/plugin_helper/thread.rb', line 103

def thread_exist?(title)
  @_threads.values.count{|thread| title == thread[:_fluentd_plugin_helper_thread_title] } > 0
end

#thread_running?(title) ⇒ Boolean

Returns:

  • (Boolean)


112
113
114
115
# File 'lib/fluent/plugin_helper/thread.rb', line 112

def thread_running?(title)
  t = @_threads.values.find{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }
  t && t[:_fluentd_plugin_helper_thread_running]
end

#thread_started?(title) ⇒ Boolean

Returns:

  • (Boolean)


107
108
109
110
# File 'lib/fluent/plugin_helper/thread.rb', line 107

def thread_started?(title)
  t = @_threads.values.find{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }
  t && t[:_fluentd_plugin_helper_thread_started]
end

#thread_wait_until_startObject



37
38
39
40
41
# File 'lib/fluent/plugin_helper/thread.rb', line 37

def thread_wait_until_start
  until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && t[:_fluentd_plugin_helper_thread_started] } }
    sleep 0.1
  end
end

#thread_wait_until_stopObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin_helper/thread.rb', line 43

def thread_wait_until_stop
  timeout_at = Fluent::Clock.now + THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS
  until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && !t[:_fluentd_plugin_helper_thread_running] } }
    break if Fluent::Clock.now > timeout_at
    sleep 0.1
  end
  @_threads_mutex.synchronize{ @_threads.values }.each do |t|
    if t.alive?
      puts "going to kill the thread still running: #{t[:_fluentd_plugin_helper_thread_title]}"
      t.kill rescue nil
      t[:_fluentd_plugin_helper_thread_running] = false
    end
  end
end