Class: ActiveSupport::Testing::Parallelization::Worker

Inherits:
Object
  • Object
show all
Defined in:
activesupport/lib/active_support/testing/parallelization/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(number, url) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
11
12
# File 'activesupport/lib/active_support/testing/parallelization/worker.rb', line 7

def initialize(number, url)
  @id = SecureRandom.uuid
  @number = number
  @url = url
  @setup_exception = nil
end

Instance Method Details

#after_forkObject



80
81
82
83
84
# File 'activesupport/lib/active_support/testing/parallelization/worker.rb', line 80

def after_fork
  Parallelization.after_fork_hooks.each do |cb|
    cb.call(@number)
  end
end

#perform_job(job) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'activesupport/lib/active_support/testing/parallelization/worker.rb', line 42

def perform_job(job)
  klass    = job[0]
  method   = job[1]
  reporter = job[2]

  set_process_title("#{klass}##{method}")

  result = klass.with_info_handler reporter do
    Minitest.run_one_method(klass, method)
  end

  safe_record(reporter, result)
end

#run_cleanupObject



86
87
88
89
90
# File 'activesupport/lib/active_support/testing/parallelization/worker.rb', line 86

def run_cleanup
  Parallelization.run_cleanup_hooks.each do |cb|
    cb.call(@number)
  end
end

#safe_record(reporter, result) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'activesupport/lib/active_support/testing/parallelization/worker.rb', line 56

def safe_record(reporter, result)
  add_setup_exception(result) if @setup_exception

  begin
    @queue.record(reporter, result)
  rescue DRb::DRbConnError
    result.failures.map! do |failure|
      if failure.respond_to?(:error)
        # minitest >5.14.0
        error = DRb::DRbRemoteError.new(failure.error)
      else
        error = DRb::DRbRemoteError.new(failure.exception)
      end
      Minitest::UnexpectedError.new(error)
    end
    @queue.record(reporter, result)
  rescue Interrupt
    @queue.interrupt
    raise
  end

  set_process_title("(idle)")
end

#startObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'activesupport/lib/active_support/testing/parallelization/worker.rb', line 14

def start
  fork do
    set_process_title("(starting)")

    DRb.stop_service

    @queue = DRbObject.new_with_uri(@url)
    @queue.start_worker(@id)

    begin
      after_fork
    rescue => @setup_exception; end

    work_from_queue
  ensure
    set_process_title("(stopping)")

    run_cleanup
    @queue.stop_worker(@id)
  end
end

#work_from_queueObject



36
37
38
39
40
# File 'activesupport/lib/active_support/testing/parallelization/worker.rb', line 36

def work_from_queue
  while job = @queue.pop
    perform_job(job)
  end
end