Class: FFI::Libfuse::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/ffi/libfuse/thread_pool.rb

Overview

A self-expanding and self-limiting ThreadPool

The first thread is created on ThreadPool.new and additional threads are added through ThreadPool.busy called from within a worker iteration

A pool thread will end when

  • a worker iteration returns false or nil
  • a worker thread raises an exception (silently for StopIteration)
  • max_idle_threads is exceeded

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_idle: nil, max_active: nil, name: nil, &worker) ⇒ ThreadPool

Create a new ThreadPool

Parameters:

  • max_idle (Integer) (defaults to: nil)

    The maximum number of idle threads (>= 0)

  • max_active (Integer) (defaults to: nil)

    The maximum number of active threads (> 0)

  • name (String) (defaults to: nil)

    A prefix used to set Thread.name for pool threads

  • worker (Proc)

    The worker called repeatedly within each pool thread

Raises:

  • (ArgumentError)

See Also:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ffi/libfuse/thread_pool.rb', line 42

def initialize(max_idle: nil, max_active: nil, name: nil, &worker)
  raise ArgumentError, "max_active #{max_active} must be > 0" if max_active && !max_active.positive?
  raise ArgumentError, "max_idle: #{max_idle} must be >= 0" if max_idle&.negative?
  raise ArgumentError, 'must have worker block but none given' unless worker

  @max_idle = max_idle
  @max_active = max_active
  @name = name
  @worker = worker
  @mutex = Mutex.new
  @size = 0
  @busy = 0
  @idle_death = Set.new
  @completed = Queue.new
  @group = ThreadGroup.new.add(synchronize { start_thread }).enclose
end

Instance Attribute Details

#groupThreadGroup (readonly)

Returns the enclosed thread group to which the pool's threads are added.

Returns:

  • (ThreadGroup)

    the enclosed thread group to which the pool's threads are added



33
34
35
# File 'lib/ffi/libfuse/thread_pool.rb', line 33

def group
  @group
end

Class Method Details

.busy(&block) ⇒ Object

Starts a new thread if the current thread is a thread pool member and there are no other idle threads in the pool. The thread is marked as busy for the duration of the yield.

If the current thread is not a pool member then simply yields



23
24
25
26
27
28
29
# File 'lib/ffi/libfuse/thread_pool.rb', line 23

def busy(&block)
  if (tp = Thread.current[:tp])
    tp.busy(&block)
  elsif block_given?
    yield
  end
end

Instance Method Details

#join {|thread, error = nil| ... } ⇒ void

This method returns an undefined value.

Join the ThreadPool optionally handling thread completion

Yields:

  • (thread, error = nil)

Yield Parameters:

  • thread (Thread)

    a Thread that has finished

  • error (StandardError)

    if thread raised an exception



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/ffi/libfuse/thread_pool.rb', line 64

def join
  while (t = @completed.pop)
    begin
      t.join
      yield t if block_given?
    rescue StandardError => e
      yield t, e if block_given?
    end
  end
  self
end

#listArray<Thread>

Returns busy,idle threads.

Returns:

  • (Array<Thread>, Array<Thread>)

    busy,idle threads



85
86
87
# File 'lib/ffi/libfuse/thread_pool.rb', line 85

def list
  group.list.partition { |t| t[:tp_busy] }
end