Class: SolidQueue::Pool

Inherits:
Object
  • Object
show all
Includes:
AppExecutor
Defined in:
lib/solid_queue/pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AppExecutor

#handle_thread_error, #wrap_in_app_executor

Constructor Details

#initialize(size, on_idle: nil) ⇒ Pool

Returns a new instance of Pool.



11
12
13
14
15
16
# File 'lib/solid_queue/pool.rb', line 11

def initialize(size, on_idle: nil)
  @size = size
  @on_idle = on_idle
  @available_threads = Concurrent::AtomicFixnum.new(size)
  @mutex = Mutex.new
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



7
8
9
# File 'lib/solid_queue/pool.rb', line 7

def size
  @size
end

Instance Method Details

#idle?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/solid_queue/pool.rb', line 41

def idle?
  idle_threads > 0
end

#idle_threadsObject



37
38
39
# File 'lib/solid_queue/pool.rb', line 37

def idle_threads
  available_threads.value
end

#post(execution) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/solid_queue/pool.rb', line 18

def post(execution)
  available_threads.decrement

  future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
    wrap_in_app_executor do
      thread_execution.perform
    ensure
      available_threads.increment
      mutex.synchronize { on_idle.try(:call) if idle? }
    end
  end

  future.add_observer do |_, _, error|
    handle_thread_error(error) if error
  end

  future.execute
end