Class: WorkBatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/work_batcher.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ WorkBatcher

Returns a new instance of WorkBatcher.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/work_batcher.rb', line 27

def initialize(options = {})
  @size_limit   = get_option(options, :size_limit)
  @time_limit   = get_option(options, :time_limit, 5)
  @deduplicate  = get_option(options, :deduplicate)
  @deduplicator = get_option(options, :deduplicator, method(:default_deduplicator))
  @executor     = get_option(options, :executor, Concurrent.global_io_executor)
  @processor    = get_option!(options, :processor)

  @mutex = Mutex.new
  if @deduplicate
    @queue = {}
  else
    @queue = []
  end
  @processed = 0
end

Instance Method Details

#add(work_object) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/work_batcher.rb', line 54

def add(work_object)
  @mutex.synchronize do
    if @deduplicate
      key = @deduplicator.call(work_object)
      @queue[key] = work_object
    else
      @queue << work_object
    end
    schedule_processing
  end
end

#add_multiple(work_objects) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/work_batcher.rb', line 66

def add_multiple(work_objects)
  return if work_objects.empty?

  @mutex.synchronize do
    if @deduplicate
      work_objects.each do |work_object|
        key = @deduplicator.call(work_object)
        @queue[key] = work_object
      end
    else
      @queue.concat(work_objects)
    end
    schedule_processing
  end
end

#inspect_queueObject



94
95
96
97
98
99
100
101
102
# File 'lib/work_batcher.rb', line 94

def inspect_queue
  @mutex.synchronize do
    if @deduplicate
      @queue.values.dup
    else
      @queue.dup
    end
  end
end

#shutdownObject



44
45
46
47
48
49
50
51
52
# File 'lib/work_batcher.rb', line 44

def shutdown
  task = @mutex.synchronize do
    @scheduled_processing_task
  end
  if task
    task.reschedule(0)
    task.wait!
  end
end

#statusObject



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/work_batcher.rb', line 82

def status
  result = {}
  @mutex.synchronize do
    if @scheduled_processing_task
      result[:scheduled_processing_time] = @scheduled_processing_time
    end
    result[:queue_count] = @queue.size
    result[:processed_count] = @processed
  end
  result
end