Class: EventMachine::Pool
- Inherits:
-
Object
- Object
- EventMachine::Pool
- Defined in:
- lib/em/pool.rb
Overview
= EventMachine::Pool
A simple async resource pool based on a resource and work queue. Resources are enqueued and work waits for resources to become available.
Example:
EM.run do pool = EM::Pool.new spawn = lambda { pool.add EM::HttpRequest.new('http://example.org') } 10.times { spawn[] } done, scheduled = 0, 0
check = lambda do
done += 1
if done >= scheduled
EM.stop
end
end
pool.on_error { |conn| spawn[] }
100.times do
pool.perform do |conn|
req = conn.get :path => '/', :keepalive => true
req.callback do
p [:success, conn.object_id, i, req.response.size]
check[]
end
req.errback { check[] }
req
end
end
end
Resources are expected to be controlled by an object responding to a deferrable/completion style API with callback and errback blocks.
Instance Method Summary collapse
- #add(resource) ⇒ Object
- #completion(deferrable, resource) ⇒ Object protected
-
#contents ⇒ Object
Returns a list for introspection purposes only.
- #failure(resource) ⇒ Object protected
-
#initialize ⇒ Pool
constructor
A new instance of Pool.
-
#num_waiting ⇒ Object
A peek at the number of enqueued jobs waiting for resources.
-
#on_error(*a, &b) ⇒ Object
Define a default catch-all for when the deferrables returned by work blocks enter a failed state.
-
#perform(*a, &b) ⇒ Object
(also: #reschedule)
Perform a given #call-able object or block.
- #process(work, resource) ⇒ Object protected
- #remove(resource) ⇒ Object
-
#removed?(resource) ⇒ Boolean
Removed will show resources in a partial pruned state.
- #requeue(resource) ⇒ Object protected
Constructor Details
Instance Method Details
#add(resource) ⇒ Object
52 53 54 55 |
# File 'lib/em/pool.rb', line 52 def add resource @contents << resource requeue resource end |
#completion(deferrable, resource) ⇒ Object (protected)
134 135 136 137 |
# File 'lib/em/pool.rb', line 134 def completion deferrable, resource deferrable.callback { requeue resource } deferrable.errback { failure resource } end |
#contents ⇒ Object
Returns a list for introspection purposes only. You should NEVER call modification or work oriented methods on objects in this list. A good example use case is periodic statistics collection against a set of connection resources.
For example: pool.contents.inject(0) { |sum, connection| connection.num_bytes }
69 70 71 |
# File 'lib/em/pool.rb', line 69 def contents @contents.dup end |
#failure(resource) ⇒ Object (protected)
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/em/pool.rb', line 123 def failure resource if @on_error @contents.delete resource @on_error.call resource # Prevent users from calling a leak. @removed.delete resource else requeue resource end end |
#num_waiting ⇒ Object
A peek at the number of enqueued jobs waiting for resources
107 108 109 |
# File 'lib/em/pool.rb', line 107 def num_waiting @resources.num_waiting end |
#on_error(*a, &b) ⇒ Object
Define a default catch-all for when the deferrables returned by work blocks enter a failed state. By default all that happens is that the resource is returned to the pool. If on_error is defined, this block is responsible for re-adding the resource to the pool if it is still usable. In other words, it is generally assumed that on_error blocks explicitly handle the rest of the lifetime of the resource.
79 80 81 |
# File 'lib/em/pool.rb', line 79 def on_error *a, &b @on_error = EM::Callback(*a, &b) end |
#perform(*a, &b) ⇒ Object Also known as: reschedule
Perform a given #call-able object or block. The callable object will be called with a resource from the pool as soon as one is available, and is expected to return a deferrable.
The deferrable will have callback and errback added such that when the deferrable enters a finished state, the object is returned to the pool.
If on_error is defined, then objects are not automatically returned to the pool.
92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/em/pool.rb', line 92 def perform(*a, &b) work = EM::Callback(*a, &b) @resources.pop do |resource| if removed? resource @removed.delete resource reschedule work else process work, resource end end end |
#process(work, resource) ⇒ Object (protected)
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/em/pool.rb', line 139 def process work, resource deferrable = work.call resource if deferrable.kind_of?(EM::Deferrable) completion deferrable, resource else raise ArgumentError, "deferrable expected from work" end rescue Exception failure resource raise end |
#remove(resource) ⇒ Object
57 58 59 60 |
# File 'lib/em/pool.rb', line 57 def remove resource @contents.delete resource @removed << resource end |
#removed?(resource) ⇒ Boolean
Removed will show resources in a partial pruned state. Resources in the removed list may not appear in the contents list if they are currently in use.
114 115 116 |
# File 'lib/em/pool.rb', line 114 def removed? resource @removed.include? resource end |
#requeue(resource) ⇒ Object (protected)
119 120 121 |
# File 'lib/em/pool.rb', line 119 def requeue resource @resources.push resource end |