Class: Aggregator

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/aggregator.rb,
lib/aggregator/version.rb

Constant Summary collapse

VERSION =
"1.0.0"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeAggregator

Returns a new instance of Aggregator.



30
31
32
33
34
35
36
# File 'lib/aggregator.rb', line 30

def initialize
  @queue = Queue.new
  @mutex = Mutex.new
  @thread = nil

  at_exit { stop }
end

Instance Attribute Details

#logger=(value) ⇒ Object

Sets the attribute logger

Parameters:

  • value

    the value to set the attribute logger to.



8
9
10
# File 'lib/aggregator.rb', line 8

def logger=(value)
  @logger = value
end

#max_batch_size=(value) ⇒ Object

Sets the attribute max_batch_size

Parameters:

  • value

    the value to set the attribute max_batch_size to.



8
9
10
# File 'lib/aggregator.rb', line 8

def max_batch_size=(value)
  @max_batch_size = value
end

#max_wait_time=(value) ⇒ Object

Sets the attribute max_wait_time

Parameters:

  • value

    the value to set the attribute max_wait_time to.



8
9
10
# File 'lib/aggregator.rb', line 8

def max_wait_time=(value)
  @max_wait_time = value
end

Class Method Details

.drainObject



26
27
28
# File 'lib/aggregator.rb', line 26

def self.drain
  self.instance.drain
end

.logger=(logger) ⇒ Object



22
23
24
# File 'lib/aggregator.rb', line 22

def self.logger=(logger)
  self.instance.logger = logger
end

.max_batch_size=(value) ⇒ Object



14
15
16
# File 'lib/aggregator.rb', line 14

def self.max_batch_size=(value)
  self.instance.max_batch_size = value
end

.max_wait_time=(value) ⇒ Object



18
19
20
# File 'lib/aggregator.rb', line 18

def self.max_wait_time=(value)
  self.instance.max_wait_time = value
end

.push(data) ⇒ Object



10
11
12
# File 'lib/aggregator.rb', line 10

def self.push(data)
  self.instance.push(data)
end

Instance Method Details

#drainObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/aggregator.rb', line 43

def drain
  if running?
    if ! @queue.empty?
      log :info, "joining thread #{@thread.inspect} (queue length = #{@queue.length})"
      @drain = true
      @thread.join if running?
    end

    log :info, "stopping thread #{@thread.inspect} (queue length = #{@queue.length})"
    @thread = nil
  elsif ! @queue.empty?
    start and drain
  end

  true
end

#push(data) ⇒ Object



38
39
40
41
# File 'lib/aggregator.rb', line 38

def push(data)
  @queue.push(data)
  start unless running?
end