Class: Innertube::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/innertube.rb

Overview

A re-entrant thread-safe resource pool that generates new resources on demand.

Defined Under Namespace

Classes: BadResource, Element

Instance Method Summary collapse

Constructor Details

#initialize(open, close) ⇒ Pool

Creates a new resource pool.

Parameters:

  • open (Proc, #call)

    a callable which allocates a new object for the pool

  • close (Proc, #call)

    a callable which is called with an object before it is freed.



55
56
57
58
59
60
61
62
# File 'lib/innertube.rb', line 55

def initialize(open, close)
  @open = open
  @close = close
  @lock = Mutex.new
  @iterator = Mutex.new
  @element_released = ConditionVariable.new
  @pool = Set.new
end

Instance Method Details

#clearObject Also known as: close

On each element of the pool, calls close(element) and removes it.



66
67
68
69
70
# File 'lib/innertube.rb', line 66

def clear
  each_element do |e|
    delete_element e
  end
end

#delete_if {|object| ... } ⇒ Object

Locks each element in turn and closes/deletes elements for which the object passes the block.

Yields:

  • (object)

    a block that should determine whether an element should be deleted from the pool

Yield Parameters:

  • object (Object)

    the resource

Raises:

  • (ArgumentError)


89
90
91
92
93
94
95
96
97
# File 'lib/innertube.rb', line 89

def delete_if
  raise ArgumentError, "block required" unless block_given?

  each_element do |e|
    if yield e.object
      delete_element e
    end
  end
end

#each {|resource| ... } ⇒ Object

As each_element, but yields objects, not wrapper elements.

Yields:

  • (resource)

    a block that will do something with each resource in the pool

Yield Parameters:

  • resource (Object)

    the current resource in the iteration



177
178
179
180
181
# File 'lib/innertube.rb', line 177

def each
  each_element do |e|
    yield e.object
  end
end

#each_element {|element| ... } ⇒ Object

Iterate over a snapshot of the pool. Yielded objects are locked for the duration of the block. This may block the current thread until elements in the snapshot are released by other threads.

Yields:

  • (element)

    a block that will do something with each element in the pool

Yield Parameters:

  • element (Element)

    the current element in the iteration



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/innertube.rb', line 149

def each_element
  targets = @pool.to_a
  unlocked = []

  @iterator.synchronize do
    until targets.empty?
      @lock.synchronize do
        unlocked, targets = targets.partition {|e| e.unlocked? }
        unlocked.each {|e| e.lock }
      end

      unlocked.each do |e|
        begin
          yield e
        ensure
          e.unlock
        end
      end
      @element_released.wait(@iterator) unless targets.empty?
    end
  end
end

#sizeInteger

Returns the number of the resources in the pool.

Returns:

  • (Integer)

    the number of the resources in the pool



184
185
186
# File 'lib/innertube.rb', line 184

def size
  @lock.synchronize { @pool.size }
end

#take(opts = {}) {|resource| ... } ⇒ Object Also known as: >>

Acquire an element of the pool. Yields the object. If all elements are claimed, it will create another one.

Parameters:

  • :filter (Proc, #call)

    a callable which receives objects and has the opportunity to reject each in turn.

  • :default (Object)

    if no resources are available, use this object instead of calling #open.

Yields:

  • (resource)

    a block that will perform some action with the element of the pool

Yield Parameters:

  • resource (Object)

    a resource managed by the pool. Locked for the duration of the block

Raises:

  • (ArgumentError)


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/innertube.rb', line 110

def take(opts = {})
  raise ArgumentError, "block required" unless block_given?

  result = nil
  element = nil
  opts[:filter] ||= proc {|_| true }
  @lock.synchronize do
    element = @pool.find { |e| e.unlocked? && opts[:filter].call(e.object) }
    unless element
      # No objects were acceptable
      resource = opts[:default] || @open.call
      element = Element.new(resource)
      @pool << element
    end
    element.lock
  end
  begin
    result = yield element.object
  rescue BadResource
    delete_element element
    raise
  ensure
    # Unlock
    if element
      element.unlock
      @element_released.signal
    end
  end
  result
end