Class: Callisto::Pool
- Inherits:
-
Object
- Object
- Callisto::Pool
- Includes:
- Singleton
- Defined in:
- lib/callisto/pool.rb
Instance Attribute Summary collapse
-
#pending ⇒ Object
Returns the value of attribute pending.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#running ⇒ Object
Returns the value of attribute running.
-
#workers ⇒ Object
Returns the value of attribute workers.
Class Method Summary collapse
Instance Method Summary collapse
- #<<(task) ⇒ Object
-
#initialize ⇒ Pool
constructor
A new instance of Pool.
- #processes ⇒ Object
- #wait(id = nil) ⇒ Object
Constructor Details
#initialize ⇒ Pool
Returns a new instance of Pool.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/callisto/pool.rb', line 29 def initialize self.pending, self.running, self.workers = [], [], [] self.queue = Queue.new 1.upto(self.class.settings.max_workers) do worker = Thread.new do loop do task = self.queue.pop self.pending.delete self.class.settings.identifier.call(task) self.running << self.class.settings.identifier.call(task) self.class.settings.callback.call(task) self.running.delete self.class.settings.identifier.call(task) end end self.workers << worker end end |
Instance Attribute Details
#pending ⇒ Object
Returns the value of attribute pending.
10 11 12 |
# File 'lib/callisto/pool.rb', line 10 def pending @pending end |
#queue ⇒ Object
Returns the value of attribute queue.
10 11 12 |
# File 'lib/callisto/pool.rb', line 10 def queue @queue end |
#running ⇒ Object
Returns the value of attribute running.
10 11 12 |
# File 'lib/callisto/pool.rb', line 10 def running @running end |
#workers ⇒ Object
Returns the value of attribute workers.
10 11 12 |
# File 'lib/callisto/pool.rb', line 10 def workers @workers end |
Class Method Details
.settings ⇒ Object
23 24 25 |
# File 'lib/callisto/pool.rb', line 23 def settings @settings end |
.settings=(options) ⇒ Object
14 15 16 17 18 19 20 21 |
# File 'lib/callisto/pool.rb', line 14 def settings=() defaults = { :max_workers => 10, :identifier => proc { |entry| entry.object_id }, :callback => proc { |entry| entry.call } } @settings = OpenStruct.new(defaults.merge()) end |
Instance Method Details
#<<(task) ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/callisto/pool.rb', line 50 def <<(task) identifier = self.class.settings.identifier.call(task) if !processes.include?(identifier) self.pending << identifier self.queue << task end identifier end |
#processes ⇒ Object
59 60 61 |
# File 'lib/callisto/pool.rb', line 59 def processes pending + running end |
#wait(id = nil) ⇒ Object
46 47 48 |
# File 'lib/callisto/pool.rb', line 46 def wait(id = nil) sleep(0.1) while (id ? processes.include?(id) : processes.any?) end |