Class: Thimble::Manager
- Inherits:
-
Object
- Object
- Thimble::Manager
- Defined in:
- lib/manager.rb
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
Returns the value of attribute batch_size.
-
#max_workers ⇒ Object
readonly
Returns the value of attribute max_workers.
-
#queue_size ⇒ Object
readonly
Returns the value of attribute queue_size.
-
#worker_type ⇒ Object
readonly
Returns the value of attribute worker_type.
Class Method Summary collapse
Instance Method Summary collapse
- #current_workers(id) ⇒ Object
- #get_fork_worker(batch) ⇒ Object
- #get_thread_worker(batch) ⇒ Object
- #get_worker(batch, &block) ⇒ Object
-
#initialize(max_workers: 6, batch_size: 1000, queue_size: 1000, worker_type: :fork) ⇒ Manager
constructor
A new instance of Manager.
- #rem_worker(worker) ⇒ Object
- #sub_worker(worker, id) ⇒ Object
- #worker_available? ⇒ TrueClass, FalseClass
- #working? ⇒ TrueClass, FalseClass
Constructor Details
#initialize(max_workers: 6, batch_size: 1000, queue_size: 1000, worker_type: :fork) ⇒ Manager
Returns a new instance of Manager.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/manager.rb', line 9 def initialize(max_workers: 6, batch_size: 1000, queue_size: 1000, worker_type: :fork) raise ArgumentError, 'worker type must be either :fork or :thread' unless %i[thread fork].include?(worker_type) unless worker_type == :thread || Process.respond_to?(:fork) raise ArgumentError, 'Your system does not respond to fork please use threads.' end raise ArgumentError, 'max_workers must be greater than 0' if max_workers < 1 raise ArgumentError, 'batch size must be greater than 0' if batch_size < 1 @worker_type = worker_type @max_workers = max_workers @batch_size = batch_size @queue_size = queue_size @mutex = Mutex.new @current_workers = {} end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
Returns the value of attribute batch_size.
7 8 9 |
# File 'lib/manager.rb', line 7 def batch_size @batch_size end |
#max_workers ⇒ Object (readonly)
Returns the value of attribute max_workers.
7 8 9 |
# File 'lib/manager.rb', line 7 def max_workers @max_workers end |
#queue_size ⇒ Object (readonly)
Returns the value of attribute queue_size.
7 8 9 |
# File 'lib/manager.rb', line 7 def queue_size @queue_size end |
#worker_type ⇒ Object (readonly)
Returns the value of attribute worker_type.
7 8 9 |
# File 'lib/manager.rb', line 7 def worker_type @worker_type end |
Class Method Details
.deterministic ⇒ Thimble::Manager
109 110 111 |
# File 'lib/manager.rb', line 109 def self.deterministic new(max_workers: 1, batch_size: 1, queue_size: 1) end |
.small ⇒ Thimble::Manager
114 115 116 |
# File 'lib/manager.rb', line 114 def self.small new(max_workers: 1, batch_size: 3, queue_size: 3) end |
Instance Method Details
#current_workers(id) ⇒ Object
55 56 57 58 59 |
# File 'lib/manager.rb', line 55 def current_workers(id) @mutex.synchronize do @current_workers.select { |_k, v| v.id == id } end end |
#get_fork_worker(batch) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/manager.rb', line 75 def get_fork_worker(batch) rd, wr = IO.pipe tup = OpenStruct.new pid = fork do Signal.trap('HUP') { exit } rd.close t = Marshal.dump(batch.item.map do |item| yield item.item rescue Exception => e e end) wr.write(t) wr.close end wr.close tup.pid = pid tup.reader = rd tup end |
#get_thread_worker(batch) ⇒ Object
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/manager.rb', line 97 def get_thread_worker(batch) tup = OpenStruct.new tup.pid = Thread.new do tup.result = batch.item.map do |item| yield item.item end tup.done = true end tup end |
#get_worker(batch, &block) ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/manager.rb', line 64 def get_worker(batch, &block) @mutex.synchronize do if @worker_type == :fork get_fork_worker(batch, &block) else get_thread_worker(batch, &block) end end end |
#rem_worker(worker) ⇒ Object
48 49 50 51 52 |
# File 'lib/manager.rb', line 48 def rem_worker(worker) @mutex.synchronize do @current_workers.delete(worker.pid) end end |
#sub_worker(worker, id) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/manager.rb', line 36 def sub_worker(worker, id) raise 'Worker must contain a pid!' if worker.pid.nil? new_worker = OpenStruct.new new_worker.worker = worker new_worker.id = id @mutex.synchronize do @current_workers[worker.pid] = new_worker end end |
#worker_available? ⇒ TrueClass, FalseClass
26 27 28 |
# File 'lib/manager.rb', line 26 def worker_available? @current_workers.size < @max_workers end |
#working? ⇒ TrueClass, FalseClass
31 32 33 |
# File 'lib/manager.rb', line 31 def working? @current_workers.size.positive? end |