Class: RactorMgr

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor_mgr.rb

Overview

RactorMgr is a job queue combined with a monitor for a list of Ractors. the job of a RactorMgr is:

  1. store the list of jobs to be completed

  2. feed the worker Ractors with jobs

  3. when a worker completes a job give it another job (if there’s more work to

do)
  1. generally monitor the workers to see how many jobs have been completed, as

well as providing status of current Ractors (:idle, :working)

you create your own worker Ractors, and the RactorMgr takes care of making sure those workers stay as busy as possible.

Usage:

# make some workers
workers = 5.times.map do
  Ractor.new do
    loop do
      Ractor.yield(Ractor.receive * 2)
    end
  end
end

# create some jobs
jobs = [1] * 1_000_000; nil

# feed the jobs to the workers via the RactorMgr
rm = RactorMgr.new(
  jobs: jobs,
  workers: workers
)
rm.join             # optional, if you want to wait

NOTE on Ractor interface: the RactorMgr expects to send a worker a job, and then be able to call #take on that Ractor. it is assumed that as soon as the worker returns a value from #take that that worker is available for another job. this means you should implement your Ractors as an infinite loop where:

  • the Ractor takes in a job definition

  • the Ractor yields some value when the job is done

… thus the worker Ractor will continue accepting jobs as long as the RactorMgr keeps feeding it jobs.

WARN: while the RactorMgr is running the job list should not be altered.

status:

- :idle if the RactorMgr is finished
- :running if the RactorMgr is running

join: calls #join on the internal Thread, blocking until all jobs have

finished

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jobs:, workers:) ⇒ RactorMgr

Returns a new instance of RactorMgr.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/ractor_mgr.rb', line 58

def initialize(jobs:, workers:)
  @status = :idle
  @jobs = jobs
  @workers = workers
  @job_index = 0
  @jobs_finished = 0
  @results = []

  ##
  # start work
  @start_time = Time.now

  # initial job fill
  @workers.each do |w|
    w.send(@jobs[@job_index])
    @job_index += 1
  end

  # Thread for feeding workers
  @mgr_thread = Thread.new do
    @status = :running

    loop do
      break if @job_index == @jobs.length

      w, r = Ractor.select(*@workers)
      @results << r
      @jobs_finished += 1
      w.send(@jobs[@job_index])
      @job_index += 1
    end

    @workers.map do |w|
      @results << w.take
      @jobs_finished += 1
    end

    @status = :idle
  end
end

Instance Attribute Details

#resultsObject (readonly)

Returns the value of attribute results.



55
56
57
# File 'lib/ractor_mgr.rb', line 55

def results
  @results
end

#statusObject (readonly)

Returns the value of attribute status.



55
56
57
# File 'lib/ractor_mgr.rb', line 55

def status
  @status
end

Instance Method Details

#done?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/ractor_mgr.rb', line 99

def done?
  jobs_finished == @jobs.length
end

#etaObject

returns a TinyEta string for ETA time

see: github.com/jefflunt/tiny_eta



136
137
138
# File 'lib/ractor_mgr.rb', line 136

def eta
  TinyEta.eta(Time.now - @start_time, percent_complete_f)
end

#jobs_finishedObject

the number of jobs that have finished



109
110
111
# File 'lib/ractor_mgr.rb', line 109

def jobs_finished
  @jobs_finished
end

#jobs_remainingObject

the number of jobs remaining



119
120
121
# File 'lib/ractor_mgr.rb', line 119

def jobs_remaining
  jobs_total - jobs_finished
end

#jobs_runningObject

the number of jobs currently running



114
115
116
# File 'lib/ractor_mgr.rb', line 114

def jobs_running
  @job_index - @jobs_finished
end

#jobs_totalObject

the total number of jobs that will be run



104
105
106
# File 'lib/ractor_mgr.rb', line 104

def jobs_total
  @jobs.length
end

#joinObject

just like Thread#join, blocks until all jobs are complete



141
142
143
144
# File 'lib/ractor_mgr.rb', line 141

def join
  @mgr_thread.join
  nil
end

#percent_complete_f(digits = 0) ⇒ Object

percentage complete as a Float in the range of 0.0..1.0



124
125
126
# File 'lib/ractor_mgr.rb', line 124

def percent_complete_f(digits=0)
  (jobs_finished / jobs_total.to_f)
end

#percent_complete_s(digits = 0) ⇒ Object

percentage complete (a floating point number between 0.0..100.0)



129
130
131
# File 'lib/ractor_mgr.rb', line 129

def percent_complete_s(digits=0)
  ((jobs_finished / jobs_total.to_f) * 100).round(digits)
end

#to_sObject

out-of-the-box monitoring



147
148
149
# File 'lib/ractor_mgr.rb', line 147

def to_s
  "RactorMgr #{jobs_running} running, progress: #{jobs_finished}/#{jobs_total} #{percent_complete_s(1)}% (ETA #{eta})"
end