Class: Backburner::Workers::Threading
- Inherits:
-
Backburner::Worker
- Object
- Backburner::Worker
- Backburner::Workers::Threading
- Defined in:
- lib/backburner/workers/threading.rb
Class Attribute Summary collapse
-
.shutdown_timeout ⇒ Object
Returns the value of attribute shutdown_timeout.
-
.threads_number ⇒ Object
Returns the value of attribute threads_number.
Instance Attribute Summary collapse
-
#exit_on_shutdown ⇒ Object
Returns the value of attribute exit_on_shutdown.
-
#self_read ⇒ Object
Returns the value of attribute self_read.
-
#self_write ⇒ Object
Returns the value of attribute self_write.
Attributes inherited from Backburner::Worker
Instance Method Summary collapse
-
#initialize(*args) ⇒ Threading
constructor
Custom initializer just to set @tubes_data.
- #kill ⇒ Object
-
#prepare ⇒ Object
Used to prepare job queues before processing jobs.
-
#process_tube_names(tube_names) ⇒ Object
Process the special tube_names of Threading worker: The format is tube_name:custom_threads_limit.
-
#process_tube_options ⇒ Object
Process the tube settings This overrides @tubes_data set by process_tube_names method.
-
#register_signal_handlers! ⇒ Object
Registers signal handlers TERM and INT to trigger.
- #shutdown ⇒ Object
- #shutdown_threadpools ⇒ Object
-
#start(wait = true) ⇒ Object
Starts processing new jobs indefinitely.
-
#wait_for_shutdown! ⇒ Object
Wait for the shutdown signel.
Methods inherited from Backburner::Worker
enqueue, #handle_failure_for_job, start, #work_one_job
Methods included from Logger
included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger
Methods included from Helpers
#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc
Constructor Details
#initialize(*args) ⇒ Threading
Custom initializer just to set @tubes_data
16 17 18 19 20 21 |
# File 'lib/backburner/workers/threading.rb', line 16 def initialize(*args) @tubes_data = {} super self. @exit_on_shutdown = true end |
Class Attribute Details
.shutdown_timeout ⇒ Object
Returns the value of attribute shutdown_timeout.
12 13 14 |
# File 'lib/backburner/workers/threading.rb', line 12 def shutdown_timeout @shutdown_timeout end |
.threads_number ⇒ Object
Returns the value of attribute threads_number.
11 12 13 |
# File 'lib/backburner/workers/threading.rb', line 11 def threads_number @threads_number end |
Instance Attribute Details
#exit_on_shutdown ⇒ Object
Returns the value of attribute exit_on_shutdown.
6 7 8 |
# File 'lib/backburner/workers/threading.rb', line 6 def exit_on_shutdown @exit_on_shutdown end |
#self_read ⇒ Object
Returns the value of attribute self_read.
6 7 8 |
# File 'lib/backburner/workers/threading.rb', line 6 def self_read @self_read end |
#self_write ⇒ Object
Returns the value of attribute self_write.
6 7 8 |
# File 'lib/backburner/workers/threading.rb', line 6 def self_write @self_write end |
Instance Method Details
#kill ⇒ Object
141 142 143 |
# File 'lib/backburner/workers/threading.rb', line 141 def kill @thread_pools.each { |_name, pool| pool.kill unless pool.shutdown? } end |
#prepare ⇒ Object
Used to prepare job queues before processing jobs. Setup beanstalk tube_names and watch all specified tubes for jobs.
30 31 32 33 34 35 36 37 38 |
# File 'lib/backburner/workers/threading.rb', line 30 def prepare self.tube_names.map! { |name| (name) }.uniq! log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]" @thread_pools = {} @tubes_data.each do |name, config| max_threads = (config[:threads] || self.class.threads_number || ::Concurrent.processor_count).to_i @thread_pools[name] = (::Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: max_threads)) end end |
#process_tube_names(tube_names) ⇒ Object
Process the special tube_names of Threading worker:
The format is tube_name:custom_threads_limit
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/backburner/workers/threading.rb', line 88 def process_tube_names(tube_names) names = compact_tube_names(tube_names) if names.nil? nil else names.map do |name| data = name.split(":") tube_name = data.first threads_number = data[1].empty? ? nil : data[1].to_i rescue nil @tubes_data[(tube_name)] = { :threads => threads_number } tube_name end end end |
#process_tube_options ⇒ Object
Process the tube settings This overrides @tubes_data set by process_tube_names method. So a tube has name ‘super_job:5’ and the tube class has setting queue_jobs_limit 10, the result limit will be 10 If the tube is known by existing allq queue, but not by class - skip it
110 111 112 113 114 115 116 117 118 |
# File 'lib/backburner/workers/threading.rb', line 110 def Backburner::Worker.known_queue_classes.each do |queue| next if @tubes_data[(queue)].nil? queue_settings = { :threads => queue.queue_jobs_limit } @tubes_data[(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 } end end |
#register_signal_handlers! ⇒ Object
Registers signal handlers TERM and INT to trigger
152 153 154 155 156 157 158 159 160 |
# File 'lib/backburner/workers/threading.rb', line 152 def register_signal_handlers! @self_read, @self_write = IO.pipe %w[TERM INT].each do |sig| trap(sig) do raise Interrupt if @in_shutdown self_write.puts(sig) end end end |
#shutdown ⇒ Object
145 146 147 148 149 |
# File 'lib/backburner/workers/threading.rb', line 145 def shutdown log_info "beginning graceful worker shutdown" shutdown_threadpools super if @exit_on_shutdown end |
#shutdown_threadpools ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/backburner/workers/threading.rb', line 127 def shutdown_threadpools @thread_pools.each { |_name, pool| pool.shutdown } shutdown_time = Time.now @in_shutdown = true all_shutdown = @thread_pools.all? do |_name, pool| time_to_wait = self.class.shutdown_timeout - (Time.now - shutdown_time).to_i pool.wait_for_termination(time_to_wait) if time_to_wait > 0 end rescue Interrupt log_info "graceful shutdown aborted, shutting down immediately" ensure kill unless all_shutdown end |
#start(wait = true) ⇒ Object
Starts processing new jobs indefinitely. Primary way to consume and process jobs in specified tubes.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/backburner/workers/threading.rb', line 46 def start(wait=true) prepare @thread_pools.each do |tube_name, pool| pool.max_length.times do # Create a new connection and set it up to listen on this tube name # connection = new_connection.tap{ |conn| conn.tubes.watch!(tube_name) } # connection.on_reconnect = lambda { |conn| conn.tubes.watch!(tube_name) } # Make it work jobs using its own connection per thread pool.post(connection) do |memo_connection| # TODO: use read-write lock? loop do begin break if @in_shutdown work_one_job(memo_connection, tube_name) rescue => e log_error("Exception caught in thread pool loop. Continuing. -> #{e.}\nBacktrace: #{e.backtrace}") end end connection.close end end end wait_for_shutdown! if wait end |
#wait_for_shutdown! ⇒ Object
Wait for the shutdown signel
121 122 123 124 125 |
# File 'lib/backburner/workers/threading.rb', line 121 def wait_for_shutdown! raise Interrupt while IO.select([self_read]) rescue Interrupt shutdown end |