Class: FiberPool

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(count = 100) ⇒ FiberPool

Prepare a list of fibers that are able to run different blocks of code every time. Once a fiber is done with its block, it attempts to fetch another one from the queue



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fiber_pool.rb', line 43

def initialize(count = 100)
  @fibers,@busy_fibers,@queue,@generic_callbacks = [],{},[],[]
  count.times do |i|
    fiber = Fiber.new do |block|
      loop do
        block.call
        # callbacks are called in a reverse order, much like c++ destructor
        Fiber.current[:callbacks].pop.call while Fiber.current[:callbacks].length > 0
        generic_callbacks.each do |cb|
          cb.call
        end
        unless @queue.empty?
          block = @queue.shift
        else
          @busy_fibers.delete(Fiber.current.object_id)
          @fibers.unshift Fiber.current
          block = Fiber.yield
        end
      end
    end
    fiber[:callbacks] = []
    fiber[:em_keys] = []
    @fibers << fiber
  end
end

Instance Attribute Details

#busy_fibersObject (readonly)

Returns the value of attribute busy_fibers.



33
34
35
# File 'lib/fiber_pool.rb', line 33

def busy_fibers
  @busy_fibers
end

#fibersObject (readonly)

gives access to the currently free fibers



32
33
34
# File 'lib/fiber_pool.rb', line 32

def fibers
  @fibers
end

#generic_callbacksObject

Code can register a proc with this FiberPool to be called every time a Fiber is finished. Good for releasing resources like ActiveRecord database connections.



38
39
40
# File 'lib/fiber_pool.rb', line 38

def generic_callbacks
  @generic_callbacks
end

Instance Method Details

#spawn(&block) ⇒ Object

If there is an available fiber use it, otherwise, leave it to linger in a queue



71
72
73
74
75
76
77
78
79
80
# File 'lib/fiber_pool.rb', line 71

def spawn(&block)
  if fiber = @fibers.shift
    fiber[:callbacks] = []
    @busy_fibers[fiber.object_id] = fiber
    fiber.resume(block)
  else
    @queue << block
  end
  self # we are keen on hiding our queue
end