Class: LogStash::Outputs::Cassandra::Buffer
- Inherits:
-
Object
- Object
- LogStash::Outputs::Cassandra::Buffer
- Defined in:
- lib/logstash/outputs/cassandra/buffer.rb
Instance Method Summary collapse
- #contents ⇒ Object
- #flush ⇒ Object
-
#initialize(logger, max_size, flush_interval, &block) ⇒ Buffer
constructor
A new instance of Buffer.
- #push(item) ⇒ Object (also: #<<)
-
#push_multi(items) ⇒ Object
Push multiple items onto the buffer in a single operation.
- #stop(do_flush = true, wait_complete = true) ⇒ Object
-
#synchronize ⇒ Object
For externally operating on the buffer contents this takes a block and will yield the internal buffer and executes the block in a synchronized block from the internal mutex.
Constructor Details
#initialize(logger, max_size, flush_interval, &block) ⇒ Buffer
Returns a new instance of Buffer.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 7 def initialize(logger, max_size, flush_interval, &block) @logger = logger # You need to aquire this for anything modifying state generally @operations_mutex = Mutex.new @operations_lock = java.util.concurrent.locks.ReentrantLock.new @stopping = Concurrent::AtomicBoolean.new(false) @max_size = max_size @submit_proc = block @buffer = [] @last_flush = Time.now @flush_interval = flush_interval @flush_thread = spawn_interval_flusher end |
Instance Method Details
#contents ⇒ Object
56 57 58 |
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 56 def contents synchronize {|buffer| buffer} end |
#flush ⇒ Object
39 40 41 |
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 39 def flush synchronize { flush_unsafe } end |
#push(item) ⇒ Object Also known as: <<
24 25 26 27 28 |
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 24 def push(item) synchronize do |buffer| push_unsafe(item) end end |
#push_multi(items) ⇒ Object
Push multiple items onto the buffer in a single operation
32 33 34 35 36 37 |
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 32 def push_multi(items) raise ArgumentError, "push multi takes an array!, not an #{items.class}!" unless items.is_a?(Array) synchronize do |buffer| items.each {|item| push_unsafe(item) } end end |
#stop(do_flush = true, wait_complete = true) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 43 def stop(do_flush=true,wait_complete=true) return if stopping? @stopping.make_true # No need to acquire a lock in this case return if !do_flush && !wait_complete synchronize do flush_unsafe if do_flush @flush_thread.join if wait_complete end end |
#synchronize ⇒ Object
For externally operating on the buffer contents this takes a block and will yield the internal buffer and executes the block in a synchronized block from the internal mutex
63 64 65 |
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 63 def synchronize @operations_mutex.synchronize { yield(@buffer) } end |