Class: Airbrake::ThreadPool Private

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/airbrake-ruby/thread_pool.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

ThreadPool implements a simple thread pool that can configure the number of worker threads and the size of the queue to process.

Examples:

# Initialize a new thread pool with 5 workers and a queue size of 100. Set
# the block to be run concurrently.
thread_pool = ThreadPool.new(
  name: 'performance-notifier',
  worker_size: 5,
  queue_size: 100,
  block: proc { |message| print "ECHO: #{message}..."}
)

# Send work.
10.times { |i| thread_pool << i }
#=> ECHO: 0...ECHO: 1...ECHO: 2...

Since:

  • v4.6.1

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#logger

Constructor Details

#initialize(worker_size:, queue_size:, block:, name: nil) ⇒ ThreadPool

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ThreadPool.

Since:

  • v4.6.1



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/airbrake-ruby/thread_pool.rb', line 28

def initialize(worker_size:, queue_size:, block:, name: nil)
  @name = name
  @worker_size = worker_size
  @queue_size = queue_size
  @block = block

  @queue = SizedQueue.new(queue_size)
  @workers = ThreadGroup.new
  @mutex = Mutex.new
  @pid = nil
  @closed = false

  has_workers?
end

Instance Attribute Details

#workersThreadGroup (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

This is exposed for eaiser unit testing

Returns the list of workers.

Returns:

  • (ThreadGroup)

    the list of workers

Since:

  • v4.6.1



26
27
28
# File 'lib/airbrake-ruby/thread_pool.rb', line 26

def workers
  @workers
end

Instance Method Details

#<<(message) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a new message to the thread pool. Rejects messages if the queue is at its capacity.

Parameters:

  • message (Object)

    The message that gets passed to the block

Returns:

  • (Boolean)

    true if the message was successfully sent to the pool, false if the queue is full

Since:

  • v4.6.1



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/airbrake-ruby/thread_pool.rb', line 49

def <<(message)
  if backlog >= @queue_size
    logger.info do
      "#{LOG_LABEL} ThreadPool has reached its capacity of " \
      "#{@queue_size} and the following message will not be " \
      "processed: #{message.inspect}"
    end
    return false
  end

  @queue << message
  true
end

#backlogInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns how big the queue is at the moment.

Returns:

  • (Integer)

    how big the queue is at the moment

Since:

  • v4.6.1



64
65
66
# File 'lib/airbrake-ruby/thread_pool.rb', line 64

def backlog
  @queue.size
end

#closevoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Closes the thread pool making it a no-op (it shut downs all worker threads). Before closing, waits on all unprocessed tasks to be processed.

Raises:

Since:

  • v4.6.1



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/airbrake-ruby/thread_pool.rb', line 101

def close
  threads = @mutex.synchronize do
    raise Airbrake::Error, 'this thread pool is closed already' if @closed

    unless @queue.empty?
      msg = "#{LOG_LABEL} waiting to process #{@queue.size} task(s)..."
      logger.debug("#{msg} (Ctrl-C to abort)")
    end

    @worker_size.times { @queue << :stop }
    @closed = true
    @workers.list.dup
  end

  threads.each(&:join)
  logger.debug("#{LOG_LABEL} #{@name} thread pool closed")
end

#closed?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)

Since:

  • v4.6.1



119
120
121
# File 'lib/airbrake-ruby/thread_pool.rb', line 119

def closed?
  @closed
end

#has_workers?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Checks if a thread pool has any workers. A thread pool doesn’t have any workers only in two cases: when it was closed or when all workers crashed. An active thread pool doesn’t have any workers only when something went wrong.

Workers are expected to crash when you fork the process the workers are living in. In this case we detect a fork and try to revive them here.

Another possible scenario that crashes workers is when you close the instance on at_exit, but some other at_exit hook prevents the process from exiting.

Returns:

  • (Boolean)

    true if an instance wasn’t closed, but has no workers

See Also:

Since:

  • v4.6.1



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/airbrake-ruby/thread_pool.rb', line 82

def has_workers?
  @mutex.synchronize do
    return false if @closed

    if @pid != Process.pid && @workers.list.empty?
      @pid = Process.pid
      @workers = ThreadGroup.new
      spawn_workers
    end

    !@closed && @workers.list.any?
  end
end

#spawn_workersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • v4.6.1



123
124
125
# File 'lib/airbrake-ruby/thread_pool.rb', line 123

def spawn_workers
  @worker_size.times { @workers.add(spawn_worker) }
end