Class: Sumologic::Utils::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sumologic/utils/worker.rb

Overview

Generic worker pool for parallel execution of tasks Uses Queue + Mutex pattern for thread-safe concurrent processing

This utility abstracts the parallel execution pattern used across the codebase (metadata fetching, search pagination, etc.) into a reusable component.

Example:

worker = Worker.new(max_threads: 3, request_delay: 0.2)
results = worker.execute(items) do |item|
  fetch_data(item)
end

Constant Summary collapse

DEFAULT_MAX_THREADS =
10
DEFAULT_REQUEST_DELAY =
0.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_threads: DEFAULT_MAX_THREADS, request_delay: DEFAULT_REQUEST_DELAY) ⇒ Worker

Initialize worker pool

Parameters:

  • max_threads (Integer) (defaults to: DEFAULT_MAX_THREADS)

    Maximum number of concurrent threads

  • request_delay (Float) (defaults to: DEFAULT_REQUEST_DELAY)

    Delay in seconds between requests (for rate limiting)



25
26
27
28
# File 'lib/sumologic/utils/worker.rb', line 25

def initialize(max_threads: DEFAULT_MAX_THREADS, request_delay: DEFAULT_REQUEST_DELAY)
  @max_threads = max_threads
  @request_delay = request_delay
end

Instance Attribute Details

#max_threadsObject (readonly)

Returns the value of attribute max_threads.



20
21
22
# File 'lib/sumologic/utils/worker.rb', line 20

def max_threads
  @max_threads
end

#request_delayObject (readonly)

Returns the value of attribute request_delay.



20
21
22
# File 'lib/sumologic/utils/worker.rb', line 20

def request_delay
  @request_delay
end

Instance Method Details

#execute(items, callbacks: {}) {|item| ... } ⇒ Array

Execute work items using a thread pool Returns array of results from the block execution

Parameters:

  • items (Array)

    Work items to process

  • callbacks (Hash) (defaults to: {})

    Optional callbacks for progress tracking:

    • :start => ->(worker_count, total_items) { }

    • :progress => ->(completed_count, total_items) { }

    • :finish => ->(results, duration) { }

Yields:

  • (item)

    Block to execute for each item

Returns:

  • (Array)

    Results from block executions (nil results are filtered out)



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sumologic/utils/worker.rb', line 40

def execute(items, callbacks: {}, &block)
  return [] if items.empty?

  start_time = Time.now
  context = {
    result: [],
    completed: { count: 0 },
    mutex: Mutex.new,
    total_items: items.size,
    callbacks: callbacks
  }

  queue = create_work_queue(items)
  worker_count = [@max_threads, queue.size].min

  # Callback: start
  callbacks[:start]&.call(worker_count, items.size)

  threads = create_workers(queue, context, &block)

  threads.each(&:join)

  # Callback: finish
  duration = Time.now - start_time
  callbacks[:finish]&.call(context[:result], duration)

  context[:result]
end