Class: Async::ResourcePool::Simple
- Inherits:
-
Object
- Object
- Async::ResourcePool::Simple
- Defined in:
- lib/async/resource_pool/simple.rb
Instance Method Summary collapse
-
#acquire(timeout = nil) ⇒ Object
Acquires resource for current fiber if available otherwise yield to reactor.
-
#already_acquired? ⇒ True|False
Returns true if resource already acquired by fiber.
-
#can_be_acquired? ⇒ True|False
Returns true if pool has available resource.
-
#info ⇒ Hash
Represents current state of resource pool.
-
#initialize(limit, wakeup_strategy = :immediately) ⇒ Simple
constructor
A new instance of Simple.
-
#release ⇒ Object
Releases resource for current fiber.
-
#try_acquire ⇒ True|False
Acquires resource for current fiber if resource available.
Constructor Details
#initialize(limit, wakeup_strategy = :immediately) ⇒ Simple
Returns a new instance of Simple.
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/async/resource_pool/simple.rb', line 10 def initialize(limit, wakeup_strategy = :immediately) raise ArgumentError, 'limit must be greater than 0' if limit <= 0 unless [:immediately, :next_loop].include?(wakeup_strategy) raise ArgumentError, 'wakeup_strategy must be :immediately or :next_loop' end @limit = limit @wakeup_strategy = wakeup_strategy @owners = [] @waiters = [] @thread_mutex = Mutex.new end |
Instance Method Details
#acquire(timeout = nil) ⇒ Object
Acquires resource for current fiber if available otherwise yield to reactor. Will be resumed once resource available. Will raise Async::ResourcePool::TimeoutError if timeout not nil and resource isn’t available after timeout. Will raise Async::ResourcePool::AlreadyOwnError if resource already acquired.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/async/resource_pool/simple.rb', line 28 def acquire(timeout = nil) raise Async::ResourcePool::AlreadyOwnError.new if already_acquired? unless acquire_if_available timeout.nil? ? Async::Task.yield : wait_with_timeout(timeout) @thread_mutex.synchronize { @owners.push(Fiber.current) } end if block_given? begin yield ensure release end end nil end |
#already_acquired? ⇒ True|False
Returns true if resource already acquired by fiber
72 73 74 |
# File 'lib/async/resource_pool/simple.rb', line 72 def already_acquired? @owners.include?(Fiber.current) end |
#can_be_acquired? ⇒ True|False
Returns true if pool has available resource
77 78 79 |
# File 'lib/async/resource_pool/simple.rb', line 77 def can_be_acquired? @owners.size < @limit end |
#info ⇒ Hash
Returns represents current state of resource pool. waiters - how many fibers waits for resource. owners - how many fibers own resource. limit - maximum of resources that can be owned simultaneously.
85 86 87 88 89 90 91 |
# File 'lib/async/resource_pool/simple.rb', line 85 def info { waiters: @waiters.size, owners: @owners.size, limit: @limit } end |
#release ⇒ Object
Releases resource for current fiber. Will resume first fiber that waits for resource immediately if wakeup_strategy == :immediately Will resume first fiber that waits for resource in next reactor loop if wakeup_strategy == :next_loop Will raise Async::ResourcePool::DoesNotOwnError if fiber does not own resource.
65 66 67 68 69 |
# File 'lib/async/resource_pool/simple.rb', line 65 def release raise Async::ResourcePool::DoesNotOwnError.new unless already_acquired? @owners.delete(Fiber.current) wakeup end |
#try_acquire ⇒ True|False
Acquires resource for current fiber if resource available. Will raise Async::ResourcePool::AlreadyOwnError if resource already acquired.
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/async/resource_pool/simple.rb', line 50 def try_acquire raise Async::ResourcePool::AlreadyOwnError.new if already_acquired? if acquire_if_available true else @waiters.delete(Fiber.current) false end end |