Class: EventMachine::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/em/pool.rb

Overview

= EventMachine::Pool

A simple async resource pool based on a resource and work queue. Resources are enqueued and work waits for resources to become available.

Example:

EM.run do pool = EM::Pool.new spawn = lambda { pool.add EM::HttpRequest.new('http://example.org') } 10.times { spawn[] } done, scheduled = 0, 0

 check = lambda do
   done += 1
   if done >= scheduled
     EM.stop
   end
 end

 pool.on_error { |conn| spawn[] }

 100.times do
   pool.perform do |conn|
     req = conn.get :path => '/', :keepalive => true

     req.callback do
       p [:success, conn.object_id, i, req.response.size]
       check[]
     end

     req.errback { check[] }

     req
   end
 end

end

Resources are expected to be controlled by an object responding to a deferrable/completion style API with callback and errback blocks.

Instance Method Summary collapse

Constructor Details

#initializePool

Returns a new instance of Pool.



45
46
47
48
49
50
# File 'lib/em/pool.rb', line 45

def initialize
  @resources = EM::Queue.new
  @removed = []
  @contents = []
  @on_error = nil
end

Instance Method Details

#add(resource) ⇒ Object



52
53
54
55
# File 'lib/em/pool.rb', line 52

def add resource
  @contents << resource
  requeue resource
end

#completion(deferrable, resource) ⇒ Object (protected)



134
135
136
137
# File 'lib/em/pool.rb', line 134

def completion deferrable, resource
  deferrable.callback { requeue resource }
  deferrable.errback  { failure resource }
end

#contentsObject

Returns a list for introspection purposes only. You should NEVER call modification or work oriented methods on objects in this list. A good example use case is periodic statistics collection against a set of connection resources.

For example: pool.contents.inject(0) { |sum, connection| connection.num_bytes }



69
70
71
# File 'lib/em/pool.rb', line 69

def contents
  @contents.dup
end

#failure(resource) ⇒ Object (protected)



123
124
125
126
127
128
129
130
131
132
# File 'lib/em/pool.rb', line 123

def failure resource
  if @on_error
    @contents.delete resource
    @on_error.call resource
    # Prevent users from calling a leak.
    @removed.delete resource
  else
    requeue resource
  end
end

#num_waitingObject

A peek at the number of enqueued jobs waiting for resources



107
108
109
# File 'lib/em/pool.rb', line 107

def num_waiting
  @resources.num_waiting
end

#on_error(*a, &b) ⇒ Object

Define a default catch-all for when the deferrables returned by work blocks enter a failed state. By default all that happens is that the resource is returned to the pool. If on_error is defined, this block is responsible for re-adding the resource to the pool if it is still usable. In other words, it is generally assumed that on_error blocks explicitly handle the rest of the lifetime of the resource.



79
80
81
# File 'lib/em/pool.rb', line 79

def on_error *a, &b
  @on_error = EM::Callback(*a, &b)
end

#perform(*a, &b) ⇒ Object Also known as: reschedule

Perform a given #call-able object or block. The callable object will be called with a resource from the pool as soon as one is available, and is expected to return a deferrable.

The deferrable will have callback and errback added such that when the deferrable enters a finished state, the object is returned to the pool.

If on_error is defined, then objects are not automatically returned to the pool.



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/em/pool.rb', line 92

def perform(*a, &b)
  work = EM::Callback(*a, &b)

  @resources.pop do |resource|
    if removed? resource
      @removed.delete resource
      reschedule work
    else
      process work, resource
    end
  end
end

#process(work, resource) ⇒ Object (protected)



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/em/pool.rb', line 139

def process work, resource
  deferrable = work.call resource
  if deferrable.kind_of?(EM::Deferrable)
    completion deferrable, resource
  else
    raise ArgumentError, "deferrable expected from work"
  end
rescue Exception
  failure resource
  raise
end

#remove(resource) ⇒ Object



57
58
59
60
# File 'lib/em/pool.rb', line 57

def remove resource
  @contents.delete resource
  @removed << resource
end

#removed?(resource) ⇒ Boolean

Removed will show resources in a partial pruned state. Resources in the removed list may not appear in the contents list if they are currently in use.

Returns:

  • (Boolean)


114
115
116
# File 'lib/em/pool.rb', line 114

def removed? resource
  @removed.include? resource
end

#requeue(resource) ⇒ Object (protected)



119
120
121
# File 'lib/em/pool.rb', line 119

def requeue resource
  @resources.push resource
end