Class: LogStash::Outputs::Cassandra::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/cassandra/buffer.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger, max_size, flush_interval, &block) ⇒ Buffer

Returns a new instance of Buffer.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 7

def initialize(logger, max_size, flush_interval, &block)
  @logger = logger
  # You need to aquire this for anything modifying state generally
  @operations_mutex = Mutex.new
  @operations_lock = java.util.concurrent.locks.ReentrantLock.new

  @stopping = Concurrent::AtomicBoolean.new(false)
  @max_size = max_size
  @submit_proc = block

  @buffer = []

  @last_flush = Time.now
  @flush_interval = flush_interval
  @flush_thread = spawn_interval_flusher
end

Instance Method Details

#contentsObject



56
57
58
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 56

def contents
  synchronize {|buffer| buffer}
end

#flushObject



39
40
41
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 39

def flush
  synchronize { flush_unsafe }
end

#push(item) ⇒ Object Also known as: <<



24
25
26
27
28
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 24

def push(item)
  synchronize do |buffer|
    push_unsafe(item)
  end
end

#push_multi(items) ⇒ Object

Push multiple items onto the buffer in a single operation

Raises:

  • (ArgumentError)


32
33
34
35
36
37
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 32

def push_multi(items)
  raise ArgumentError, "push multi takes an array!, not an #{items.class}!" unless items.is_a?(Array)
  synchronize do |buffer|
    items.each {|item| push_unsafe(item) }
  end
end

#stop(do_flush = true, wait_complete = true) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 43

def stop(do_flush=true,wait_complete=true)
  return if stopping?
  @stopping.make_true

  # No need to acquire a lock in this case
  return if !do_flush && !wait_complete

  synchronize do
    flush_unsafe if do_flush
    @flush_thread.join if wait_complete
  end
end

#synchronizeObject

For externally operating on the buffer contents this takes a block and will yield the internal buffer and executes the block in a synchronized block from the internal mutex



63
64
65
# File 'lib/logstash/outputs/cassandra/buffer.rb', line 63

def synchronize
  @operations_mutex.synchronize { yield(@buffer) }
end