Class: ActiveSupport::Testing::Parallelization::Worker
- Defined in:
- lib/active_support/testing/parallelization/worker.rb
Instance Method Summary collapse
- #after_fork ⇒ Object
-
#initialize(number, url) ⇒ Worker
constructor
A new instance of Worker.
- #perform_job(job) ⇒ Object
- #run_cleanup ⇒ Object
- #safe_record(reporter, result) ⇒ Object
- #start ⇒ Object
- #work_from_queue ⇒ Object
Constructor Details
#initialize(number, url) ⇒ Worker
Returns a new instance of Worker.
7 8 9 10 11 12 |
# File 'lib/active_support/testing/parallelization/worker.rb', line 7 def initialize(number, url) @id = SecureRandom.uuid @number = number @url = url @setup_exception = nil end |
Instance Method Details
#after_fork ⇒ Object
80 81 82 83 84 |
# File 'lib/active_support/testing/parallelization/worker.rb', line 80 def after_fork Parallelization.after_fork_hooks.each do |cb| cb.call(@number) end end |
#perform_job(job) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/active_support/testing/parallelization/worker.rb', line 42 def perform_job(job) klass = job[0] method = job[1] reporter = job[2] set_process_title("#{klass}##{method}") result = klass.with_info_handler reporter do Minitest.run_one_method(klass, method) end safe_record(reporter, result) end |
#run_cleanup ⇒ Object
86 87 88 89 90 |
# File 'lib/active_support/testing/parallelization/worker.rb', line 86 def run_cleanup Parallelization.run_cleanup_hooks.each do |cb| cb.call(@number) end end |
#safe_record(reporter, result) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/active_support/testing/parallelization/worker.rb', line 56 def safe_record(reporter, result) add_setup_exception(result) if @setup_exception begin @queue.record(reporter, result) rescue DRb::DRbConnError result.failures.map! do |failure| if failure.respond_to?(:error) # minitest >5.14.0 error = DRb::DRbRemoteError.new(failure.error) else error = DRb::DRbRemoteError.new(failure.exception) end Minitest::UnexpectedError.new(error) end @queue.record(reporter, result) rescue Interrupt @queue.interrupt raise end set_process_title("(idle)") end |
#start ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/active_support/testing/parallelization/worker.rb', line 14 def start fork do set_process_title("(starting)") DRb.stop_service @queue = DRbObject.new_with_uri(@url) @queue.start_worker(@id) begin after_fork rescue => @setup_exception; end work_from_queue ensure set_process_title("(stopping)") run_cleanup @queue.stop_worker(@id) end end |
#work_from_queue ⇒ Object
36 37 38 39 40 |
# File 'lib/active_support/testing/parallelization/worker.rb', line 36 def work_from_queue while job = @queue.pop perform_job(job) end end |