Class: Backburner::Workers::ThreadsOnFork

Inherits:
Backburner::Worker show all
Defined in:
lib/backburner/workers/threads_on_fork.rb

Class Attribute Summary collapse

Attributes inherited from Backburner::Worker

#connection, #tube_names

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Backburner::Worker

enqueue, #handle_failure_for_job, #shutdown, start, #work_one_job

Methods included from Logger

included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger

Methods included from Helpers

#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc

Constructor Details

#initialize(*args) ⇒ ThreadsOnFork

Custom initializer just to set @tubes_data



70
71
72
73
74
# File 'lib/backburner/workers/threads_on_fork.rb', line 70

def initialize(*args)
  @tubes_data = {}
  super
  self.process_tube_options
end

Class Attribute Details

.garbage_afterObject

Returns the value of attribute garbage_after.



7
8
9
# File 'lib/backburner/workers/threads_on_fork.rb', line 7

def garbage_after
  @garbage_after
end

.is_childObject

Returns the value of attribute is_child.



8
9
10
# File 'lib/backburner/workers/threads_on_fork.rb', line 8

def is_child
  @is_child
end

.shutdownObject

Returns the value of attribute shutdown.



5
6
7
# File 'lib/backburner/workers/threads_on_fork.rb', line 5

def shutdown
  @shutdown
end

.threads_numberObject

Returns the value of attribute threads_number.



6
7
8
# File 'lib/backburner/workers/threads_on_fork.rb', line 6

def threads_number
  @threads_number
end

Class Method Details

.child_pidsObject

return the pids of all alive children/forks



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/backburner/workers/threads_on_fork.rb', line 11

def child_pids
  return [] if is_child
  @child_pids ||= []
  tmp_ids = []
  for id in @child_pids
    next if id.to_i == Process.pid
    begin
      Process.kill(0, id)
      tmp_ids << id
    rescue Errno::ESRCH
    end
  end
  @child_pids = tmp_ids if @child_pids != tmp_ids
  @child_pids
end

.finish_forksObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/backburner/workers/threads_on_fork.rb', line 51

def finish_forks
  return if is_child

  ids = child_pids
  if ids.length > 0
    puts "[ThreadsOnFork workers] Stopping forks: #{ids.join(", ")}"
    stop_forks
    Kernel.sleep 1
    ids = child_pids
    if ids.length > 0
      puts "[ThreadsOnFork workers] Killing remaining forks: #{ids.join(", ")}"
      kill_forks
      Process.waitall
    end
  end
end

.kill_forksObject

Send a SIGKILL signal to all children This is the same of assassinate We are KILLING those folks that don’t obey us



42
43
44
45
46
47
48
49
# File 'lib/backburner/workers/threads_on_fork.rb', line 42

def kill_forks
  for id in child_pids
    begin
      Process.kill("SIGKILL", id)
    rescue Errno::ESRCH
    end
  end
end

.stop_forksObject

Send a SIGTERM signal to all children This is the same of a normal exit We are simply asking the children to exit



30
31
32
33
34
35
36
37
# File 'lib/backburner/workers/threads_on_fork.rb', line 30

def stop_forks
  for id in child_pids
    begin
      Process.kill("SIGTERM", id)
    rescue Errno::ESRCH
    end
  end
end

Instance Method Details

#coolest_exitObject

Exit with Kernel.exit! to avoid at_exit callbacks that should belongs to parent process We will use exitcode 99 that means the fork reached the garbage number



230
231
232
# File 'lib/backburner/workers/threads_on_fork.rb', line 230

def coolest_exit
  Kernel.exit! 99
end

#create_thread(*args, &block) ⇒ Object

Create a thread. Easy to test



235
236
237
# File 'lib/backburner/workers/threads_on_fork.rb', line 235

def create_thread(*args, &block)
  Thread.new(*args, &block)
end

#fork_and_watch(name) ⇒ Object

Make the fork and create a thread to watch the child process The exit code ‘99’ means that the fork exited because of the garbage limit Any other code is an error



146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/backburner/workers/threads_on_fork.rb', line 146

def fork_and_watch(name)
  create_thread(name) do |tube_name|
    until self.class.shutdown
      pid = fork_tube(tube_name)
      _, status = wait_for_process(pid)

      # 99 = garbaged
      if status.exitstatus != 99
        log_error("Catastrophic failure: tube #{tube_name} exited with code #{status.exitstatus}.")
      end
    end
  end
end

#fork_inner(name) ⇒ Object

Here we are already on the forked child We will watch just the selected tube and change the configuration of queue_config.max_job_retries if needed

If we limit the number of threads to 1 it will just run in a loop without creating any extra thread.



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/backburner/workers/threads_on_fork.rb', line 173

def fork_inner(name)
  if @tubes_data[name]
    queue_config.max_job_retries = @tubes_data[name][:retries] if @tubes_data[name][:retries]
  else
    @tubes_data[name] = {}
  end
  @garbage_after  = @tubes_data[name][:garbage]  || self.class.garbage_after
  @threads_number = (@tubes_data[name][:threads] || self.class.threads_number || 1).to_i

  @runs = 0

  puts "Threads number = #{@threads_number}"

  if @threads_number == 1
    watch_tube(name)
    run_while_can
  else
    threads_count = Thread.list.count
    @threads_number.times do
      create_thread do
        begin
          conn = new_connection
          watch_tube(name, conn)
          run_while_can(conn)
        ensure
          conn.close if conn
        end
      end
    end
    sleep 0.1 while Thread.list.count > threads_count
  end

  coolest_exit
end

#fork_it(&blk) ⇒ Object

Forks the specified block and adds the process to the child process pool FIXME: If blk.call breaks then the pid isn’t added to child_pids and is never shutdown



249
250
251
252
253
254
255
256
257
# File 'lib/backburner/workers/threads_on_fork.rb', line 249

def fork_it(&blk)
  pid = Kernel.fork do
    self.class.is_child = true
    $0 = "[ThreadsOnFork worker] parent: #{Process.ppid}"
    blk.call
  end
  self.class.child_pids << pid
  pid
end

#fork_tube(name) ⇒ Object

This makes easy to test



161
162
163
164
165
# File 'lib/backburner/workers/threads_on_fork.rb', line 161

def fork_tube(name)
  fork_it do
    fork_inner(name)
  end
end

#on_reconnect(conn) ⇒ Object



223
224
225
# File 'lib/backburner/workers/threads_on_fork.rb', line 223

def on_reconnect(conn)
  watch_tube(@watched_tube_name, conn) if @watched_tube_name
end

#prepareObject



122
123
124
125
126
127
128
# File 'lib/backburner/workers/threads_on_fork.rb', line 122

def prepare
  self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
  self.tube_names = Array(self.tube_names)
  tube_names.map! { |name| expand_tube_name(name)  }.uniq!
  tube_display_names = tube_names.map{|name| "#{name}:#{@tubes_data[name].values}"}
  log_info "Working #{tube_names.size} queues: [ #{tube_display_names.join(', ')} ]"
end

#process_tube_names(tube_names) ⇒ Object

Process the special tube_names of ThreadsOnFork worker The idea is tube_name:custom_threads_limit:custom_garbage_limit:custom_retries Any custom can be ignore. So if you want to set just the custom_retries you will need to write this ‘tube_name:::10’

Examples:

process_tube_names(['foo:10:5:1', 'bar:2::3', 'lol'])
=> ['foo', 'bar', 'lol']


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/backburner/workers/threads_on_fork.rb', line 84

def process_tube_names(tube_names)
  names = compact_tube_names(tube_names)
  if names.nil?
    nil
  else
    names.map do |name|
      data = name.split(":")
      tube_name = data.first
      threads_number = data[1].empty? ? nil : data[1].to_i rescue nil
      garbage_number = data[2].empty? ? nil : data[2].to_i rescue nil
      retries_number = data[3].empty? ? nil : data[3].to_i rescue nil
      @tubes_data[expand_tube_name(tube_name)] = {
          :threads => threads_number,
          :garbage => garbage_number,
          :retries => retries_number
      }
      tube_name
    end
  end
end

#process_tube_optionsObject

Process the tube settings This overrides @tubes_data set by process_tube_names method. So a tube has name ‘super_job:5:20:10’ and the tube class has setting queue_jobs_limit 10, the result limit will be 10 If the tube is known by existing allq queue, but not by class - skip it



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/backburner/workers/threads_on_fork.rb', line 110

def process_tube_options
  Backburner::Worker.known_queue_classes.each do |queue|
    next if @tubes_data[expand_tube_name(queue)].nil?
    queue_settings = {
        :threads => queue.queue_jobs_limit,
        :garbage => queue.queue_garbage_limit,
        :retries => queue.queue_retry_limit
    }
    @tubes_data[expand_tube_name(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 }
  end
end

#run_while_can(conn = connection) ⇒ Object

Run work_one_job while we can



209
210
211
212
213
214
215
# File 'lib/backburner/workers/threads_on_fork.rb', line 209

def run_while_can(conn = connection)
  log_info "Run while can"
  while @garbage_after.nil? or @garbage_after > @runs
    @runs += 1 # FIXME: Likely race condition
    work_one_job(conn, @watched_tube_name)
  end
end

#start(lock = true) ⇒ Object

For each tube we will call fork_and_watch to create the fork The lock argument define if this method should block or no



132
133
134
135
136
137
138
139
140
141
# File 'lib/backburner/workers/threads_on_fork.rb', line 132

def start(lock=true)
  prepare
  tube_names.each do |name|
    fork_and_watch(name)
  end

  if lock
    sleep 0.1 while true
  end
end

#wait_for_process(pid) ⇒ Object

Wait for a specific process. Easy to test



240
241
242
243
244
# File 'lib/backburner/workers/threads_on_fork.rb', line 240

def wait_for_process(pid)
  out = Process.wait2(pid)
  self.class.child_pids.delete(pid)
  out
end

#watch_tube(name, conn = connection) ⇒ Object

Shortcut for watching a tube on our beanstalk connection



218
219
220
221
# File 'lib/backburner/workers/threads_on_fork.rb', line 218

def watch_tube(name, conn = connection)
  @watched_tube_name = name
  # No op for allq
end