Class: Thimble::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/manager.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_workers: 6, batch_size: 1000, queue_size: 1000, worker_type: :fork) ⇒ Manager

Returns a new instance of Manager.

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/manager.rb', line 9

def initialize(max_workers: 6, batch_size: 1000, queue_size: 1000, worker_type: :fork)
  raise ArgumentError, 'worker type must be either :fork or :thread' unless %i[thread fork].include?(worker_type)
  unless worker_type == :thread || Process.respond_to?(:fork)
    raise ArgumentError, 'Your system does not respond to fork please use threads.'
  end
  raise ArgumentError, 'max_workers must be greater than 0' if max_workers < 1
  raise ArgumentError, 'batch size must be greater than 0' if batch_size < 1

  @worker_type = worker_type
  @max_workers = max_workers
  @batch_size = batch_size
  @queue_size = queue_size
  @mutex = Mutex.new
  @current_workers = {}
end

Instance Attribute Details

#batch_sizeObject (readonly)

Returns the value of attribute batch_size.



7
8
9
# File 'lib/manager.rb', line 7

def batch_size
  @batch_size
end

#max_workersObject (readonly)

Returns the value of attribute max_workers.



7
8
9
# File 'lib/manager.rb', line 7

def max_workers
  @max_workers
end

#queue_sizeObject (readonly)

Returns the value of attribute queue_size.



7
8
9
# File 'lib/manager.rb', line 7

def queue_size
  @queue_size
end

#worker_typeObject (readonly)

Returns the value of attribute worker_type.



7
8
9
# File 'lib/manager.rb', line 7

def worker_type
  @worker_type
end

Class Method Details

.deterministicThimble::Manager

Returns:



109
110
111
# File 'lib/manager.rb', line 109

def self.deterministic
  new(max_workers: 1, batch_size: 1, queue_size: 1)
end

.smallThimble::Manager

Returns:



114
115
116
# File 'lib/manager.rb', line 114

def self.small
  new(max_workers: 1, batch_size: 3, queue_size: 3)
end

Instance Method Details

#current_workers(id) ⇒ Object

Parameters:

  • id (Object)


55
56
57
58
59
# File 'lib/manager.rb', line 55

def current_workers(id)
  @mutex.synchronize do
    @current_workers.select { |_k, v| v.id == id }
  end
end

#get_fork_worker(batch) ⇒ Object

Parameters:

  • batch (Object)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/manager.rb', line 75

def get_fork_worker(batch)
  rd, wr = IO.pipe
  tup = OpenStruct.new
  pid = fork do
    Signal.trap('HUP') { exit }
    rd.close
    t = Marshal.dump(batch.item.map do |item|
      yield item.item
    rescue Exception => e
      e
    end)
    wr.write(t)
    wr.close
  end
  wr.close
  tup.pid = pid
  tup.reader = rd
  tup
end

#get_thread_worker(batch) ⇒ Object

Parameters:

  • batch (Object)
  • block (Proc)


97
98
99
100
101
102
103
104
105
106
# File 'lib/manager.rb', line 97

def get_thread_worker(batch)
  tup = OpenStruct.new
  tup.pid = Thread.new do
    tup.result = batch.item.map do |item|
      yield item.item
    end
    tup.done = true
  end
  tup
end

#get_worker(batch, &block) ⇒ Object

Parameters:

  • batch (Object)
  • block (Proc)

Returns:

  • (Object)


64
65
66
67
68
69
70
71
72
# File 'lib/manager.rb', line 64

def get_worker(batch, &block)
  @mutex.synchronize do
    if @worker_type == :fork
      get_fork_worker(batch, &block)
    else
      get_thread_worker(batch, &block)
    end
  end
end

#rem_worker(worker) ⇒ Object

Parameters:

  • worker (Object)


48
49
50
51
52
# File 'lib/manager.rb', line 48

def rem_worker(worker)
  @mutex.synchronize do
    @current_workers.delete(worker.pid)
  end
end

#sub_worker(worker, id) ⇒ Object

Parameters:

  • id (Object)


36
37
38
39
40
41
42
43
44
45
# File 'lib/manager.rb', line 36

def sub_worker(worker, id)
  raise 'Worker must contain a pid!' if worker.pid.nil?

  new_worker = OpenStruct.new
  new_worker.worker = worker
  new_worker.id = id
  @mutex.synchronize do
    @current_workers[worker.pid] = new_worker
  end
end

#worker_available?TrueClass, FalseClass

Returns:

  • (TrueClass, FalseClass)


26
27
28
# File 'lib/manager.rb', line 26

def worker_available?
  @current_workers.size < @max_workers
end

#working?TrueClass, FalseClass

Returns:

  • (TrueClass, FalseClass)


31
32
33
# File 'lib/manager.rb', line 31

def working?
  @current_workers.size.positive?
end