Class: Backburner::Workers::ThreadsOnFork
- Inherits:
-
Backburner::Worker
- Object
- Backburner::Worker
- Backburner::Workers::ThreadsOnFork
- Defined in:
- lib/backburner/workers/threads_on_fork.rb
Class Attribute Summary collapse
-
.garbage_after ⇒ Object
Returns the value of attribute garbage_after.
-
.is_child ⇒ Object
Returns the value of attribute is_child.
-
.shutdown ⇒ Object
Returns the value of attribute shutdown.
-
.threads_number ⇒ Object
Returns the value of attribute threads_number.
Attributes inherited from Backburner::Worker
Class Method Summary collapse
-
.child_pids ⇒ Object
return the pids of all alive children/forks.
- .finish_forks ⇒ Object
-
.kill_forks ⇒ Object
Send a SIGKILL signal to all children This is the same of assassinate We are KILLING those folks that don’t obey us.
-
.stop_forks ⇒ Object
Send a SIGTERM signal to all children This is the same of a normal exit We are simply asking the children to exit.
Instance Method Summary collapse
-
#coolest_exit ⇒ Object
Exit with Kernel.exit! to avoid at_exit callbacks that should belongs to parent process We will use exitcode 99 that means the fork reached the garbage number.
-
#create_thread(*args, &block) ⇒ Object
Create a thread.
-
#fork_and_watch(name) ⇒ Object
Make the fork and create a thread to watch the child process The exit code ‘99’ means that the fork exited because of the garbage limit Any other code is an error.
-
#fork_inner(name) ⇒ Object
Here we are already on the forked child We will watch just the selected tube and change the configuration of queue_config.max_job_retries if needed.
-
#fork_it(&blk) ⇒ Object
Forks the specified block and adds the process to the child process pool FIXME: If blk.call breaks then the pid isn’t added to child_pids and is never shutdown.
-
#fork_tube(name) ⇒ Object
This makes easy to test.
-
#initialize(*args) ⇒ ThreadsOnFork
constructor
Custom initializer just to set @tubes_data.
- #on_reconnect(conn) ⇒ Object
- #prepare ⇒ Object
-
#process_tube_names(tube_names) ⇒ Object
Process the special tube_names of ThreadsOnFork worker The idea is tube_name:custom_threads_limit:custom_garbage_limit:custom_retries Any custom can be ignore.
-
#process_tube_options ⇒ Object
Process the tube settings This overrides @tubes_data set by process_tube_names method.
-
#run_while_can(conn = connection) ⇒ Object
Run work_one_job while we can.
-
#start(lock = true) ⇒ Object
For each tube we will call fork_and_watch to create the fork The lock argument define if this method should block or no.
-
#wait_for_process(pid) ⇒ Object
Wait for a specific process.
-
#watch_tube(name, conn = connection) ⇒ Object
Shortcut for watching a tube on our beanstalk connection.
Methods inherited from Backburner::Worker
enqueue, #handle_failure_for_job, #shutdown, start, #work_one_job
Methods included from Logger
included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger
Methods included from Helpers
#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc
Constructor Details
#initialize(*args) ⇒ ThreadsOnFork
Custom initializer just to set @tubes_data
70 71 72 73 74 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 70 def initialize(*args) @tubes_data = {} super self. end |
Class Attribute Details
.garbage_after ⇒ Object
Returns the value of attribute garbage_after.
7 8 9 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 7 def garbage_after @garbage_after end |
.is_child ⇒ Object
Returns the value of attribute is_child.
8 9 10 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 8 def is_child @is_child end |
.shutdown ⇒ Object
Returns the value of attribute shutdown.
5 6 7 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 5 def shutdown @shutdown end |
.threads_number ⇒ Object
Returns the value of attribute threads_number.
6 7 8 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 6 def threads_number @threads_number end |
Class Method Details
.child_pids ⇒ Object
return the pids of all alive children/forks
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 11 def child_pids return [] if is_child @child_pids ||= [] tmp_ids = [] for id in @child_pids next if id.to_i == Process.pid begin Process.kill(0, id) tmp_ids << id rescue Errno::ESRCH end end @child_pids = tmp_ids if @child_pids != tmp_ids @child_pids end |
.finish_forks ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 51 def finish_forks return if is_child ids = child_pids if ids.length > 0 puts "[ThreadsOnFork workers] Stopping forks: #{ids.join(", ")}" stop_forks Kernel.sleep 1 ids = child_pids if ids.length > 0 puts "[ThreadsOnFork workers] Killing remaining forks: #{ids.join(", ")}" kill_forks Process.waitall end end end |
.kill_forks ⇒ Object
Send a SIGKILL signal to all children This is the same of assassinate We are KILLING those folks that don’t obey us
42 43 44 45 46 47 48 49 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 42 def kill_forks for id in child_pids begin Process.kill("SIGKILL", id) rescue Errno::ESRCH end end end |
.stop_forks ⇒ Object
Send a SIGTERM signal to all children This is the same of a normal exit We are simply asking the children to exit
30 31 32 33 34 35 36 37 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 30 def stop_forks for id in child_pids begin Process.kill("SIGTERM", id) rescue Errno::ESRCH end end end |
Instance Method Details
#coolest_exit ⇒ Object
Exit with Kernel.exit! to avoid at_exit callbacks that should belongs to parent process We will use exitcode 99 that means the fork reached the garbage number
230 231 232 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 230 def coolest_exit Kernel.exit! 99 end |
#create_thread(*args, &block) ⇒ Object
Create a thread. Easy to test
235 236 237 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 235 def create_thread(*args, &block) Thread.new(*args, &block) end |
#fork_and_watch(name) ⇒ Object
Make the fork and create a thread to watch the child process The exit code ‘99’ means that the fork exited because of the garbage limit Any other code is an error
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 146 def fork_and_watch(name) create_thread(name) do |tube_name| until self.class.shutdown pid = fork_tube(tube_name) _, status = wait_for_process(pid) # 99 = garbaged if status.exitstatus != 99 log_error("Catastrophic failure: tube #{tube_name} exited with code #{status.exitstatus}.") end end end end |
#fork_inner(name) ⇒ Object
Here we are already on the forked child We will watch just the selected tube and change the configuration of queue_config.max_job_retries if needed
If we limit the number of threads to 1 it will just run in a loop without creating any extra thread.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 173 def fork_inner(name) if @tubes_data[name] queue_config.max_job_retries = @tubes_data[name][:retries] if @tubes_data[name][:retries] else @tubes_data[name] = {} end @garbage_after = @tubes_data[name][:garbage] || self.class.garbage_after @threads_number = (@tubes_data[name][:threads] || self.class.threads_number || 1).to_i @runs = 0 puts "Threads number = #{@threads_number}" if @threads_number == 1 watch_tube(name) run_while_can else threads_count = Thread.list.count @threads_number.times do create_thread do begin conn = new_connection watch_tube(name, conn) run_while_can(conn) ensure conn.close if conn end end end sleep 0.1 while Thread.list.count > threads_count end coolest_exit end |
#fork_it(&blk) ⇒ Object
Forks the specified block and adds the process to the child process pool FIXME: If blk.call breaks then the pid isn’t added to child_pids and is never shutdown
249 250 251 252 253 254 255 256 257 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 249 def fork_it(&blk) pid = Kernel.fork do self.class.is_child = true $0 = "[ThreadsOnFork worker] parent: #{Process.ppid}" blk.call end self.class.child_pids << pid pid end |
#fork_tube(name) ⇒ Object
This makes easy to test
161 162 163 164 165 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 161 def fork_tube(name) fork_it do fork_inner(name) end end |
#on_reconnect(conn) ⇒ Object
223 224 225 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 223 def on_reconnect(conn) watch_tube(@watched_tube_name, conn) if @watched_tube_name end |
#prepare ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 122 def prepare self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues self.tube_names = Array(self.tube_names) tube_names.map! { |name| (name) }.uniq! tube_display_names = tube_names.map{|name| "#{name}:#{@tubes_data[name].values}"} log_info "Working #{tube_names.size} queues: [ #{tube_display_names.join(', ')} ]" end |
#process_tube_names(tube_names) ⇒ Object
Process the special tube_names of ThreadsOnFork worker The idea is tube_name:custom_threads_limit:custom_garbage_limit:custom_retries Any custom can be ignore. So if you want to set just the custom_retries you will need to write this ‘tube_name:::10’
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 84 def process_tube_names(tube_names) names = compact_tube_names(tube_names) if names.nil? nil else names.map do |name| data = name.split(":") tube_name = data.first threads_number = data[1].empty? ? nil : data[1].to_i rescue nil garbage_number = data[2].empty? ? nil : data[2].to_i rescue nil retries_number = data[3].empty? ? nil : data[3].to_i rescue nil @tubes_data[(tube_name)] = { :threads => threads_number, :garbage => garbage_number, :retries => retries_number } tube_name end end end |
#process_tube_options ⇒ Object
Process the tube settings This overrides @tubes_data set by process_tube_names method. So a tube has name ‘super_job:5:20:10’ and the tube class has setting queue_jobs_limit 10, the result limit will be 10 If the tube is known by existing allq queue, but not by class - skip it
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 110 def Backburner::Worker.known_queue_classes.each do |queue| next if @tubes_data[(queue)].nil? queue_settings = { :threads => queue.queue_jobs_limit, :garbage => queue.queue_garbage_limit, :retries => queue.queue_retry_limit } @tubes_data[(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 } end end |
#run_while_can(conn = connection) ⇒ Object
Run work_one_job while we can
209 210 211 212 213 214 215 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 209 def run_while_can(conn = connection) log_info "Run while can" while @garbage_after.nil? or @garbage_after > @runs @runs += 1 # FIXME: Likely race condition work_one_job(conn, @watched_tube_name) end end |
#start(lock = true) ⇒ Object
For each tube we will call fork_and_watch to create the fork The lock argument define if this method should block or no
132 133 134 135 136 137 138 139 140 141 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 132 def start(lock=true) prepare tube_names.each do |name| fork_and_watch(name) end if lock sleep 0.1 while true end end |
#wait_for_process(pid) ⇒ Object
Wait for a specific process. Easy to test
240 241 242 243 244 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 240 def wait_for_process(pid) out = Process.wait2(pid) self.class.child_pids.delete(pid) out end |
#watch_tube(name, conn = connection) ⇒ Object
Shortcut for watching a tube on our beanstalk connection
218 219 220 221 |
# File 'lib/backburner/workers/threads_on_fork.rb', line 218 def watch_tube(name, conn = connection) @watched_tube_name = name # No op for allq end |