Class: Aggregator
- Inherits:
-
Object
- Object
- Aggregator
- Includes:
- Singleton
- Defined in:
- lib/aggregator.rb,
lib/aggregator/version.rb
Constant Summary collapse
- VERSION =
"1.0.0"
Instance Attribute Summary collapse
-
#logger ⇒ Object
writeonly
Sets the attribute logger.
-
#max_batch_size ⇒ Object
writeonly
Sets the attribute max_batch_size.
-
#max_wait_time ⇒ Object
writeonly
Sets the attribute max_wait_time.
Class Method Summary collapse
- .drain ⇒ Object
- .logger=(logger) ⇒ Object
- .max_batch_size=(value) ⇒ Object
- .max_wait_time=(value) ⇒ Object
- .push(data) ⇒ Object
Instance Method Summary collapse
- #drain ⇒ Object
-
#initialize ⇒ Aggregator
constructor
A new instance of Aggregator.
- #push(data) ⇒ Object
Constructor Details
#initialize ⇒ Aggregator
Returns a new instance of Aggregator.
30 31 32 33 34 35 36 |
# File 'lib/aggregator.rb', line 30 def initialize @queue = Queue.new @mutex = Mutex.new @thread = nil at_exit { stop } end |
Instance Attribute Details
#logger=(value) ⇒ Object
Sets the attribute logger
8 9 10 |
# File 'lib/aggregator.rb', line 8 def logger=(value) @logger = value end |
#max_batch_size=(value) ⇒ Object
Sets the attribute max_batch_size
8 9 10 |
# File 'lib/aggregator.rb', line 8 def max_batch_size=(value) @max_batch_size = value end |
#max_wait_time=(value) ⇒ Object
Sets the attribute max_wait_time
8 9 10 |
# File 'lib/aggregator.rb', line 8 def max_wait_time=(value) @max_wait_time = value end |
Class Method Details
.drain ⇒ Object
26 27 28 |
# File 'lib/aggregator.rb', line 26 def self.drain self.instance.drain end |
.logger=(logger) ⇒ Object
22 23 24 |
# File 'lib/aggregator.rb', line 22 def self.logger=(logger) self.instance.logger = logger end |
.max_batch_size=(value) ⇒ Object
14 15 16 |
# File 'lib/aggregator.rb', line 14 def self.max_batch_size=(value) self.instance.max_batch_size = value end |
.max_wait_time=(value) ⇒ Object
18 19 20 |
# File 'lib/aggregator.rb', line 18 def self.max_wait_time=(value) self.instance.max_wait_time = value end |
.push(data) ⇒ Object
10 11 12 |
# File 'lib/aggregator.rb', line 10 def self.push(data) self.instance.push(data) end |
Instance Method Details
#drain ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/aggregator.rb', line 43 def drain if running? if ! @queue.empty? log :info, "joining thread #{@thread.inspect} (queue length = #{@queue.length})" @drain = true @thread.join if running? end log :info, "stopping thread #{@thread.inspect} (queue length = #{@queue.length})" @thread = nil elsif ! @queue.empty? start and drain end true end |
#push(data) ⇒ Object
38 39 40 41 |
# File 'lib/aggregator.rb', line 38 def push(data) @queue.push(data) start unless running? end |